diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl
index 51b66f4f9..94497ef46 100644
--- a/apps/emqx/src/emqx_channel.erl
+++ b/apps/emqx/src/emqx_channel.erl
@@ -1210,6 +1210,10 @@ handle_call(
ChanInfo1 = info(NChannel),
emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}),
reply(ok, reset_timer(keepalive, NChannel));
+handle_call({Type, _Meta} = MsgsReq, Channel = #channel{session = Session}) when
+ Type =:= mqueue_msgs; Type =:= inflight_msgs
+->
+ {reply, emqx_session:info(MsgsReq, Session), Channel};
handle_call(Req, Channel) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
reply(ignored, Channel).
diff --git a/apps/emqx/src/emqx_inflight.erl b/apps/emqx/src/emqx_inflight.erl
index c342a846f..1f4433e57 100644
--- a/apps/emqx/src/emqx_inflight.erl
+++ b/apps/emqx/src/emqx_inflight.erl
@@ -36,7 +36,8 @@
max_size/1,
is_full/1,
is_empty/1,
- window/1
+ window/1,
+ query/2
]).
-export_type([inflight/0]).
@@ -138,3 +139,47 @@ size(?INFLIGHT(Tree)) ->
-spec max_size(inflight()) -> non_neg_integer().
max_size(?INFLIGHT(MaxSize, _Tree)) ->
MaxSize.
+
+-spec query(inflight(), #{continuation => Cont, limit := L}) ->
+ {[{key(), term()}], #{continuation := Cont, count := C}}
+when
+ Cont :: none | end_of_data | key(),
+ L :: non_neg_integer(),
+ C :: non_neg_integer().
+query(?INFLIGHT(Tree), #{limit := Limit} = Pager) ->
+ Count = gb_trees:size(Tree),
+ ContKey = maps:get(continuation, Pager, none),
+ {List, NextCont} = sublist(iterator_from(ContKey, Tree), Limit),
+ {List, #{continuation => NextCont, count => Count}}.
+
+iterator_from(none, Tree) ->
+ gb_trees:iterator(Tree);
+iterator_from(ContKey, Tree) ->
+ It = gb_trees:iterator_from(ContKey, Tree),
+ case gb_trees:next(It) of
+ {ContKey, _Val, ItNext} -> ItNext;
+ _ -> It
+ end.
+
+sublist(_It, 0) ->
+ {[], none};
+sublist(It, Len) ->
+ {ListAcc, HasNext} = sublist(It, Len, []),
+ {lists:reverse(ListAcc), next_cont(ListAcc, HasNext)}.
+
+sublist(It, 0, Acc) ->
+ {Acc, gb_trees:next(It) =/= none};
+sublist(It, Len, Acc) ->
+ case gb_trees:next(It) of
+ none ->
+ {Acc, false};
+ {Key, Val, ItNext} ->
+ sublist(ItNext, Len - 1, [{Key, Val} | Acc])
+ end.
+
+next_cont(_Acc, false) ->
+ end_of_data;
+next_cont([{LastKey, _LastVal} | _Acc], _HasNext) ->
+ LastKey;
+next_cont([], _HasNext) ->
+ end_of_data.
diff --git a/apps/emqx/src/emqx_mqueue.erl b/apps/emqx/src/emqx_mqueue.erl
index d085a196b..e3e54cdc9 100644
--- a/apps/emqx/src/emqx_mqueue.erl
+++ b/apps/emqx/src/emqx_mqueue.erl
@@ -68,7 +68,8 @@
stats/1,
dropped/1,
to_list/1,
- filter/2
+ filter/2,
+ query/2
]).
-define(NO_PRIORITY_TABLE, disabled).
@@ -171,6 +172,55 @@ filter(Pred, #mqueue{q = Q, len = Len, dropped = Droppend} = MQ) ->
MQ#mqueue{q = Q2, len = Len2, dropped = Droppend + Diff}
end.
+-spec query(mqueue(), #{continuation => ContMsgId, limit := L}) ->
+ {[message()], #{continuation := ContMsgId, count := C}}
+when
+ ContMsgId :: none | end_of_data | binary(),
+ C :: non_neg_integer(),
+ L :: non_neg_integer().
+query(MQ, #{limit := Limit} = Pager) ->
+ ContMsgId = maps:get(continuation, Pager, none),
+ {List, NextCont} = sublist(skip_until(MQ, ContMsgId), Limit),
+ {List, #{continuation => NextCont, count => len(MQ)}}.
+
+skip_until(MQ, none = _MsgId) ->
+ MQ;
+skip_until(MQ, MsgId) ->
+ do_skip_until(MQ, MsgId).
+
+do_skip_until(MQ, MsgId) ->
+ case out(MQ) of
+ {empty, MQ} ->
+ MQ;
+ {{value, #message{id = MsgId}}, Q1} ->
+ Q1;
+ {{value, _Msg}, Q1} ->
+ do_skip_until(Q1, MsgId)
+ end.
+
+sublist(_MQ, 0) ->
+ {[], none};
+sublist(MQ, Len) ->
+ {ListAcc, HasNext} = sublist(MQ, Len, []),
+ {lists:reverse(ListAcc), next_cont(ListAcc, HasNext)}.
+
+sublist(MQ, 0, Acc) ->
+ {Acc, element(1, out(MQ)) =/= empty};
+sublist(MQ, Len, Acc) ->
+ case out(MQ) of
+ {empty, _MQ} ->
+ {Acc, false};
+ {{value, Msg}, Q1} ->
+ sublist(Q1, Len - 1, [Msg | Acc])
+ end.
+
+next_cont(_Acc, false) ->
+ end_of_data;
+next_cont([#message{id = Id} | _Acc], _HasNext) ->
+ Id;
+next_cont([], _HasNext) ->
+ end_of_data.
+
to_list(MQ, Acc) ->
case out(MQ) of
{empty, _MQ} ->
diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl
index a84ed4d83..de9af5388 100644
--- a/apps/emqx/src/emqx_session.erl
+++ b/apps/emqx/src/emqx_session.erl
@@ -527,7 +527,7 @@ info(Session) ->
-spec info
([atom()], t()) -> [{atom(), _Value}];
- (atom(), t()) -> _Value.
+ (atom() | {atom(), _Meta}, t()) -> _Value.
info(Keys, Session) when is_list(Keys) ->
[{Key, info(Key, Session)} || Key <- Keys];
info(impl, Session) ->
diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl
index e5e60583f..dbb440f41 100644
--- a/apps/emqx/src/emqx_session_mem.erl
+++ b/apps/emqx/src/emqx_session_mem.erl
@@ -268,6 +268,9 @@ info(inflight_cnt, #session{inflight = Inflight}) ->
emqx_inflight:size(Inflight);
info(inflight_max, #session{inflight = Inflight}) ->
emqx_inflight:max_size(Inflight);
+info({inflight_msgs, PagerParams}, #session{inflight = Inflight}) ->
+ {InflightList, Meta} = emqx_inflight:query(Inflight, PagerParams),
+ {[I#inflight_data.message || {_, I} <- InflightList], Meta};
info(retry_interval, #session{retry_interval = Interval}) ->
Interval;
info(mqueue, #session{mqueue = MQueue}) ->
@@ -278,6 +281,8 @@ info(mqueue_max, #session{mqueue = MQueue}) ->
emqx_mqueue:max_len(MQueue);
info(mqueue_dropped, #session{mqueue = MQueue}) ->
emqx_mqueue:dropped(MQueue);
+info({mqueue_msgs, PagerParams}, #session{mqueue = MQueue}) ->
+ emqx_mqueue:query(MQueue, PagerParams);
info(next_pkt_id, #session{next_pkt_id = PacketId}) ->
PacketId;
info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) ->
diff --git a/apps/emqx/test/emqx_inflight_SUITE.erl b/apps/emqx/test/emqx_inflight_SUITE.erl
index c3b7ca6fc..a220129af 100644
--- a/apps/emqx/test/emqx_inflight_SUITE.erl
+++ b/apps/emqx/test/emqx_inflight_SUITE.erl
@@ -116,5 +116,83 @@ t_window(_) ->
),
?assertEqual([a, b], emqx_inflight:window(Inflight)).
-% t_to_list(_) ->
-% error('TODO').
+t_to_list(_) ->
+ Inflight = lists:foldl(
+ fun(Seq, InflightAcc) ->
+ emqx_inflight:insert(Seq, integer_to_binary(Seq), InflightAcc)
+ end,
+ emqx_inflight:new(100),
+ [1, 6, 2, 3, 10, 7, 9, 8, 4, 5]
+ ),
+ ExpList = [{Seq, integer_to_binary(Seq)} || Seq <- lists:seq(1, 10)],
+ ?assertEqual(ExpList, emqx_inflight:to_list(Inflight)).
+
+t_query(_) ->
+ EmptyInflight = emqx_inflight:new(500),
+ ?assertMatch(
+ {[], #{continuation := end_of_data}}, emqx_inflight:query(EmptyInflight, #{limit => 50})
+ ),
+ ?assertMatch(
+ {[], #{continuation := end_of_data}},
+ emqx_inflight:query(EmptyInflight, #{continuation => <<"empty">>, limit => 50})
+ ),
+ ?assertMatch(
+ {[], #{continuation := end_of_data}},
+ emqx_inflight:query(EmptyInflight, #{continuation => none, limit => 50})
+ ),
+
+ Inflight = lists:foldl(
+ fun(Seq, QAcc) ->
+ emqx_inflight:insert(Seq, integer_to_binary(Seq), QAcc)
+ end,
+ EmptyInflight,
+ lists:reverse(lists:seq(1, 114))
+ ),
+
+ LastCont = lists:foldl(
+ fun(PageSeq, Cont) ->
+ Limit = 10,
+ PagerParams = #{continuation => Cont, limit => Limit},
+ {Page, #{continuation := NextCont} = Meta} = emqx_inflight:query(Inflight, PagerParams),
+ ?assertEqual(10, length(Page)),
+ ExpFirst = PageSeq * Limit - Limit + 1,
+ ExpLast = PageSeq * Limit,
+ ?assertEqual({ExpFirst, integer_to_binary(ExpFirst)}, lists:nth(1, Page)),
+ ?assertEqual({ExpLast, integer_to_binary(ExpLast)}, lists:nth(10, Page)),
+ ?assertMatch(
+ #{count := 114, continuation := IntCont} when is_integer(IntCont),
+ Meta
+ ),
+ NextCont
+ end,
+ none,
+ lists:seq(1, 11)
+ ),
+ {LastPartialPage, LastMeta} = emqx_inflight:query(Inflight, #{
+ continuation => LastCont, limit => 10
+ }),
+ ?assertEqual(4, length(LastPartialPage)),
+ ?assertEqual({111, <<"111">>}, lists:nth(1, LastPartialPage)),
+ ?assertEqual({114, <<"114">>}, lists:nth(4, LastPartialPage)),
+ ?assertMatch(#{continuation := end_of_data, count := 114}, LastMeta),
+
+ ?assertMatch(
+ {[], #{continuation := end_of_data}},
+ emqx_inflight:query(Inflight, #{continuation => <<"not-existing-cont-id">>, limit => 10})
+ ),
+
+ {LargePage, LargeMeta} = emqx_inflight:query(Inflight, #{limit => 1000}),
+ ?assertEqual(114, length(LargePage)),
+ ?assertEqual({1, <<"1">>}, hd(LargePage)),
+ ?assertEqual({114, <<"114">>}, lists:last(LargePage)),
+ ?assertMatch(#{continuation := end_of_data}, LargeMeta),
+
+ {FullPage, FullMeta} = emqx_inflight:query(Inflight, #{limit => 114}),
+ ?assertEqual(114, length(FullPage)),
+ ?assertEqual({1, <<"1">>}, hd(FullPage)),
+ ?assertEqual({114, <<"114">>}, lists:last(FullPage)),
+ ?assertMatch(#{continuation := end_of_data}, FullMeta),
+
+ {EmptyPage, EmptyMeta} = emqx_inflight:query(Inflight, #{limit => 0}),
+ ?assertEqual([], EmptyPage),
+ ?assertMatch(#{continuation := none, count := 114}, EmptyMeta).
diff --git a/apps/emqx/test/emqx_mqueue_SUITE.erl b/apps/emqx/test/emqx_mqueue_SUITE.erl
index 51db4b98a..f3e1629a7 100644
--- a/apps/emqx/test/emqx_mqueue_SUITE.erl
+++ b/apps/emqx/test/emqx_mqueue_SUITE.erl
@@ -282,6 +282,74 @@ t_dropped(_) ->
{Msg, Q2} = ?Q:in(Msg, Q1),
?assertEqual(1, ?Q:dropped(Q2)).
+t_query(_) ->
+ EmptyQ = ?Q:init(#{max_len => 500, store_qos0 => true}),
+ ?assertMatch({[], #{continuation := end_of_data}}, ?Q:query(EmptyQ, #{limit => 50})),
+ ?assertMatch(
+ {[], #{continuation := end_of_data}},
+ ?Q:query(EmptyQ, #{continuation => <<"empty">>, limit => 50})
+ ),
+ ?assertMatch(
+ {[], #{continuation := end_of_data}}, ?Q:query(EmptyQ, #{continuation => none, limit => 50})
+ ),
+
+ Q = lists:foldl(
+ fun(Seq, QAcc) ->
+ Msg = emqx_message:make(<<"t">>, integer_to_binary(Seq)),
+ {_, QAcc1} = ?Q:in(Msg, QAcc),
+ QAcc1
+ end,
+ EmptyQ,
+ lists:seq(1, 114)
+ ),
+
+ LastCont = lists:foldl(
+ fun(PageSeq, Cont) ->
+ Limit = 10,
+ PagerParams = #{continuation => Cont, limit => Limit},
+ {Page, #{continuation := NextCont} = Meta} = ?Q:query(Q, PagerParams),
+ ?assertEqual(10, length(Page)),
+ ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1),
+ ExpLastPayload = integer_to_binary(PageSeq * Limit),
+ ?assertEqual(
+ ExpFirstPayload,
+ emqx_message:payload(lists:nth(1, Page)),
+ #{page_seq => PageSeq, page => Page, meta => Meta}
+ ),
+ ?assertEqual(ExpLastPayload, emqx_message:payload(lists:nth(10, Page))),
+ ?assertMatch(#{count := 114, continuation := <<_/binary>>}, Meta),
+ NextCont
+ end,
+ none,
+ lists:seq(1, 11)
+ ),
+ {LastPartialPage, LastMeta} = ?Q:query(Q, #{continuation => LastCont, limit => 10}),
+ ?assertEqual(4, length(LastPartialPage)),
+ ?assertEqual(<<"111">>, emqx_message:payload(lists:nth(1, LastPartialPage))),
+ ?assertEqual(<<"114">>, emqx_message:payload(lists:nth(4, LastPartialPage))),
+ ?assertMatch(#{continuation := end_of_data, count := 114}, LastMeta),
+
+ ?assertMatch(
+ {[], #{continuation := end_of_data}},
+ ?Q:query(Q, #{continuation => <<"not-existing-cont-id">>, limit => 10})
+ ),
+
+ {LargePage, LargeMeta} = ?Q:query(Q, #{limit => 1000}),
+ ?assertEqual(114, length(LargePage)),
+ ?assertEqual(<<"1">>, emqx_message:payload(hd(LargePage))),
+ ?assertEqual(<<"114">>, emqx_message:payload(lists:last(LargePage))),
+ ?assertMatch(#{continuation := end_of_data}, LargeMeta),
+
+ {FullPage, FullMeta} = ?Q:query(Q, #{limit => 114}),
+ ?assertEqual(114, length(FullPage)),
+ ?assertEqual(<<"1">>, emqx_message:payload(hd(FullPage))),
+ ?assertEqual(<<"114">>, emqx_message:payload(lists:last(FullPage))),
+ ?assertMatch(#{continuation := end_of_data}, FullMeta),
+
+ {EmptyPage, EmptyMeta} = ?Q:query(Q, #{limit => 0}),
+ ?assertEqual([], EmptyPage),
+ ?assertMatch(#{continuation := none, count := 114}, EmptyMeta).
+
conservation_prop() ->
?FORALL(
{Priorities, Messages},
diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
index 27b1ef2fc..719f62690 100644
--- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
+++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
@@ -178,8 +178,36 @@ fields(hasnext) ->
>>,
Meta = #{desc => Desc, required => true},
[{hasnext, hoconsc:mk(boolean(), Meta)}];
+fields('after') ->
+ Desc = <<
+ "The value of \"last\" field returned in the previous response. It can then be used"
+ " in subsequent requests to get the next chunk of results.
"
+ "It is used instead of \"page\" parameter to traverse volatile data.
"
+ "Can be omitted or set to \"none\" to get the first chunk of data.
"
+ "\last\" = end_of_data\" is returned, if there is no more data.
"
+ "Sending \"after=end_of_table\" back to the server will result in \"400 Bad Request\""
+ " error response."
+ >>,
+ Meta = #{
+ in => query, desc => Desc, required => false, example => <<"AAYS53qRa0n07AAABFIACg">>
+ },
+ [{'after', hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}];
+fields(last) ->
+ Desc = <<
+ "An opaque token that can then be in subsequent requests to get "
+ " the next chunk of results: \"?after={last}\"
"
+ "if there is no more data, \"last\" = end_of_data\" is returned.
"
+ "Sending \"after=end_of_table\" back to the server will result in \"400 Bad Request\""
+ " error response."
+ >>,
+ Meta = #{
+ desc => Desc, required => true, example => <<"AAYS53qRa0n07AAABFIACg">>
+ },
+ [{last, hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}];
fields(meta) ->
- fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext).
+ fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext);
+fields(continuation_meta) ->
+ fields(last) ++ fields(count).
-spec schema_with_example(hocon_schema:type(), term()) -> hocon_schema:field_schema().
schema_with_example(Type, Example) ->
diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl
index a1ab0bc3f..e470805d8 100644
--- a/apps/emqx_management/src/emqx_mgmt.erl
+++ b/apps/emqx_management/src/emqx_mgmt.erl
@@ -52,6 +52,7 @@
kickout_clients/1,
list_authz_cache/1,
list_client_subscriptions/1,
+ list_client_msgs/3,
client_subscriptions/2,
clean_authz_cache/1,
clean_authz_cache/2,
@@ -417,6 +418,12 @@ list_client_subscriptions_mem(ClientId) ->
end
end.
+list_client_msgs(MsgsType, ClientId, PagerParams) when
+ MsgsType =:= inflight_msgs;
+ MsgsType =:= mqueue_msgs
+->
+ call_client(ClientId, {MsgsType, PagerParams}).
+
client_subscriptions(Node, ClientId) ->
{Node, unwrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId))}.
diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl
index be8f24bc3..bd3e5723c 100644
--- a/apps/emqx_management/src/emqx_mgmt_api.erl
+++ b/apps/emqx_management/src/emqx_mgmt_api.erl
@@ -22,6 +22,8 @@
-define(LONG_QUERY_TIMEOUT, 50000).
+-define(CONT_BASE64_OPTS, #{mode => urlsafe, padding => false}).
+
-export([
paginate/3
]).
@@ -37,6 +39,8 @@
-export([
parse_pager_params/1,
+ parse_cont_pager_params/2,
+ encode_cont_pager_params/2,
parse_qstring/2,
init_query_result/0,
init_query_state/5,
@@ -134,6 +138,33 @@ page(Params) ->
limit(Params) when is_map(Params) ->
maps:get(<<"limit">>, Params, emqx_mgmt:default_row_limit()).
+continuation(Params, Encoding) ->
+ try
+ decode_continuation(maps:get(<<"after">>, Params, none), Encoding)
+ catch
+ _:_ ->
+ error
+ end.
+
+decode_continuation(none, _Encoding) ->
+ none;
+decode_continuation(end_of_data, _Encoding) ->
+ %% Clients should not send "after=end_of_data" back to the server
+ error;
+decode_continuation(Cont, none) ->
+ Cont;
+decode_continuation(Cont, base64) ->
+ base64:decode(Cont, ?CONT_BASE64_OPTS).
+
+encode_continuation(none, _Encoding) ->
+ none;
+encode_continuation(end_of_data, _Encoding) ->
+ end_of_data;
+encode_continuation(Cont, none) ->
+ emqx_utils_conv:bin(Cont);
+encode_continuation(Cont, base64) ->
+ base64:encode(emqx_utils_conv:bin(Cont), ?CONT_BASE64_OPTS).
+
%%--------------------------------------------------------------------
%% Node Query
%%--------------------------------------------------------------------
@@ -632,6 +663,25 @@ parse_pager_params(Params) ->
false
end.
+-spec parse_cont_pager_params(map(), none | base64) ->
+ #{limit := pos_integer(), continuation := none | end_of_table | binary()} | false.
+parse_cont_pager_params(Params, Encoding) ->
+ Cont = continuation(Params, Encoding),
+ Limit = b2i(limit(Params)),
+ case Limit > 0 andalso Cont =/= error of
+ true ->
+ #{continuation => Cont, limit => Limit};
+ false ->
+ false
+ end.
+
+-spec encode_cont_pager_params(map(), none | base64) -> map().
+encode_cont_pager_params(#{continuation := Cont} = Meta, ContEncoding) ->
+ Meta1 = maps:remove(continuation, Meta),
+ Meta1#{last => encode_continuation(Cont, ContEncoding)};
+encode_cont_pager_params(Meta, _ContEncoding) ->
+ Meta.
+
%%--------------------------------------------------------------------
%% Types
%%--------------------------------------------------------------------
diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl
index bc07d38bf..3555b5df6 100644
--- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl
+++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl
@@ -22,8 +22,8 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_cm.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-
-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_utils/include/emqx_utils_api.hrl").
-include("emqx_mgmt.hrl").
@@ -47,7 +47,9 @@
unsubscribe/2,
unsubscribe_batch/2,
set_keepalive/2,
- sessions_count/2
+ sessions_count/2,
+ inflight_msgs/2,
+ mqueue_msgs/2
]).
-export([
@@ -101,6 +103,8 @@ paths() ->
"/clients/:clientid/unsubscribe",
"/clients/:clientid/unsubscribe/bulk",
"/clients/:clientid/keepalive",
+ "/clients/:clientid/mqueue_messages",
+ "/clients/:clientid/inflight_messages",
"/sessions_count"
].
@@ -391,6 +395,14 @@ schema("/clients/:clientid/keepalive") ->
}
}
};
+schema("/clients/:clientid/mqueue_messages") ->
+ ContExample = <<"AAYS53qRa0n07AAABFIACg">>,
+ RespSchema = ?R_REF(mqueue_messages),
+ client_msgs_schema(mqueue_msgs, ?DESC(get_client_mqueue_msgs), ContExample, RespSchema);
+schema("/clients/:clientid/inflight_messages") ->
+ ContExample = <<"10">>,
+ RespSchema = ?R_REF(inflight_messages),
+ client_msgs_schema(inflight_msgs, ?DESC(get_client_inflight_msgs), ContExample, RespSchema);
schema("/sessions_count") ->
#{
'operationId' => sessions_count,
@@ -621,6 +633,26 @@ fields(subscribe) ->
fields(unsubscribe) ->
[
{topic, hoconsc:mk(binary(), #{desc => <<"Topic">>, example => <<"testtopic/#">>})}
+ ];
+fields(mqueue_messages) ->
+ [
+ {data, hoconsc:mk(hoconsc:array(?REF(message)), #{desc => ?DESC(mqueue_msgs_list)})},
+ {meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, continuation_meta), #{})}
+ ];
+fields(inflight_messages) ->
+ [
+ {data, hoconsc:mk(hoconsc:array(?REF(message)), #{desc => ?DESC(inflight_msgs_list)})},
+ {meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, continuation_meta), #{})}
+ ];
+fields(message) ->
+ [
+ {msgid, hoconsc:mk(binary(), #{desc => ?DESC(msg_id)})},
+ {topic, hoconsc:mk(binary(), #{desc => ?DESC(msg_topic)})},
+ {qos, hoconsc:mk(emqx_schema:qos(), #{desc => ?DESC(msg_qos)})},
+ {publish_at, hoconsc:mk(integer(), #{desc => ?DESC(msg_publish_at)})},
+ {from_clientid, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_clientid)})},
+ {from_username, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_username)})},
+ {payload, hoconsc:mk(binary(), #{desc => ?DESC(msg_payload)})}
].
%%%==============================================================================================
@@ -693,6 +725,15 @@ set_keepalive(put, #{bindings := #{clientid := ClientID}, body := Body}) ->
end
end.
+mqueue_msgs(get, #{bindings := #{clientid := ClientID}, query_string := QString}) ->
+ list_client_msgs(mqueue_msgs, ClientID, QString).
+
+inflight_msgs(get, #{
+ bindings := #{clientid := ClientID},
+ query_string := QString
+}) ->
+ list_client_msgs(inflight_msgs, ClientID, QString).
+
%%%==============================================================================================
%% api apply
@@ -825,6 +866,62 @@ unsubscribe_batch(#{clientid := ClientID, topics := Topics}) ->
%%--------------------------------------------------------------------
%% internal function
+client_msgs_schema(OpId, Desc, ContExample, RespSchema) ->
+ #{
+ 'operationId' => OpId,
+ get => #{
+ description => Desc,
+ tags => ?TAGS,
+ parameters => client_msgs_params(),
+ responses => #{
+ 200 =>
+ emqx_dashboard_swagger:schema_with_example(RespSchema, #{
+ <<"data">> => [message_example()],
+ <<"meta">> => #{
+ <<"count">> => 100,
+ <<"last">> => ContExample
+ }
+ }),
+ 400 =>
+ emqx_dashboard_swagger:error_codes(
+ ['INVALID_PARAMETER'], <<"Invalid parameters">>
+ ),
+ 404 => emqx_dashboard_swagger:error_codes(
+ ['CLIENTID_NOT_FOUND'], <<"Client ID not found">>
+ )
+ }
+ }
+ }.
+
+client_msgs_params() ->
+ [
+ {clientid, hoconsc:mk(binary(), #{in => path})},
+ {payload,
+ hoconsc:mk(hoconsc:enum([none, base64, plain]), #{
+ in => query,
+ default => base64,
+ desc => <<
+ "Client's inflight/mqueue messages payload encoding."
+ " If set to `none`, no payload is returned in the response."
+ >>
+ })},
+ {max_payload_bytes,
+ hoconsc:mk(emqx_schema:bytesize(), #{
+ in => query,
+ default => <<"1MB">>,
+ desc => <<
+ "Client's inflight/mqueue messages payload limit."
+ " The total payload size of all messages in the response will not exceed this value."
+ " Messages beyond the limit will be silently omitted in the response."
+ " The only exception to this rule is when the first message payload"
+ " is already larger than the limit."
+ " In this case, the first message will be returned in the response."
+ >>
+ })},
+ hoconsc:ref(emqx_dashboard_swagger, 'after'),
+ hoconsc:ref(emqx_dashboard_swagger, limit)
+ ].
+
do_subscribe(ClientID, Topic0, Options) ->
try emqx_topic:parse(Topic0, Options) of
{Topic, Opts} ->
@@ -1037,6 +1134,42 @@ remove_live_sessions(Rows) ->
Rows
).
+list_client_msgs(MsgType, ClientID, QString) ->
+ case parse_cont_pager_params(QString, MsgType) of
+ false ->
+ {400, #{code => <<"INVALID_PARAMETER">>, message => <<"after_limit_invalid">>}};
+ PagerParams = #{} ->
+ case emqx_mgmt:list_client_msgs(MsgType, ClientID, PagerParams) of
+ {error, not_found} ->
+ {404, ?CLIENTID_NOT_FOUND};
+ {Msgs, Meta = #{}} when is_list(Msgs) ->
+ format_msgs_resp(MsgType, Msgs, Meta, QString)
+ end
+ end.
+
+parse_cont_pager_params(QString, MsgType) ->
+ case emqx_mgmt_api:parse_cont_pager_params(QString, cont_encoding(MsgType)) of
+ false ->
+ false;
+ PagerParams ->
+ maybe_cast_cont(MsgType, PagerParams)
+ end.
+
+maybe_cast_cont(inflight_msgs, #{continuation := Cont} = PagerParams) when is_binary(Cont) ->
+ try
+ PagerParams#{continuation => emqx_utils_conv:int(Cont)}
+ catch
+ _:_ ->
+ false
+ end;
+maybe_cast_cont(_, PagerParams) ->
+ PagerParams.
+
+%% integer packet id
+cont_encoding(inflight_msgs) -> none;
+%% binary message id
+cont_encoding(mqueue_msgs) -> base64.
+
%%--------------------------------------------------------------------
%% QueryString to Match Spec
@@ -1197,6 +1330,79 @@ format_persistent_session_info(ClientId, PSInfo0) ->
),
result_format_undefined_to_null(PSInfo).
+format_msgs_resp(MsgType, Msgs, Meta, QString) ->
+ #{
+ <<"payload">> := PayloadFmt,
+ <<"max_payload_bytes">> := MaxBytes
+ } = QString,
+ Meta1 = emqx_mgmt_api:encode_cont_pager_params(Meta, cont_encoding(MsgType)),
+ Resp = #{meta => Meta1, data => format_msgs(Msgs, PayloadFmt, MaxBytes)},
+ %% Make sure minirest won't set another content-type for self-encoded JSON response body
+ Headers = #{<<"content-type">> => <<"application/json">>},
+ case emqx_utils_json:safe_encode(Resp) of
+ {ok, RespBin} ->
+ {200, Headers, RespBin};
+ _Error when PayloadFmt =:= plain ->
+ ?BAD_REQUEST(
+ <<"INVALID_PARAMETER">>,
+ <<"Some message payloads are not JSON serializable">>
+ );
+ %% Unexpected internal error
+ Error ->
+ ?INTERNAL_ERROR(Error)
+ end.
+
+format_msgs([FirstMsg | Msgs], PayloadFmt, MaxBytes) ->
+ %% Always include at least one message payload, even if it exceeds the limit
+ {FirstMsg1, PayloadSize0} = format_msg(FirstMsg, PayloadFmt),
+ {Msgs1, _} =
+ catch lists:foldl(
+ fun(Msg, {MsgsAcc, SizeAcc} = Acc) ->
+ {Msg1, PayloadSize} = format_msg(Msg, PayloadFmt),
+ case SizeAcc + PayloadSize of
+ SizeAcc1 when SizeAcc1 =< MaxBytes ->
+ {[Msg1 | MsgsAcc], SizeAcc1};
+ _ ->
+ throw(Acc)
+ end
+ end,
+ {[FirstMsg1], PayloadSize0},
+ Msgs
+ ),
+ lists:reverse(Msgs1);
+format_msgs([], _PayloadFmt, _MaxBytes) ->
+ [].
+
+format_msg(
+ #message{
+ id = ID,
+ qos = Qos,
+ topic = Topic,
+ from = From,
+ timestamp = Timestamp,
+ headers = Headers,
+ payload = Payload
+ },
+ PayloadFmt
+) ->
+ Msg = #{
+ msgid => emqx_guid:to_hexstr(ID),
+ qos => Qos,
+ topic => Topic,
+ publish_at => Timestamp,
+ from_clientid => emqx_utils_conv:bin(From),
+ from_username => maps:get(username, Headers, <<>>)
+ },
+ format_payload(PayloadFmt, Msg, Payload).
+
+format_payload(none, Msg, _Payload) ->
+ {Msg, 0};
+format_payload(base64, Msg, Payload) ->
+ Payload1 = base64:encode(Payload),
+ {Msg#{payload => Payload1}, erlang:byte_size(Payload1)};
+format_payload(plain, Msg, Payload) ->
+ {Msg#{payload => Payload}, erlang:iolist_size(Payload)}.
+
%% format func helpers
take_maps_from_inner(_Key, Value, Current) when is_map(Value) ->
maps:merge(Current, Value);
@@ -1298,6 +1504,17 @@ client_example() ->
<<"recv_msg.qos0">> => 0
}.
+message_example() ->
+ #{
+ <<"msgid">> => <<"000611F460D57FA9F44500000D360002">>,
+ <<"topic">> => <<"t/test">>,
+ <<"qos">> => 0,
+ <<"publish_at">> => 1709055346487,
+ <<"from_clientid">> => <<"mqttx_59ac0a87">>,
+ <<"from_username">> => <<"test-user">>,
+ <<"payload">> => <<"eyJmb28iOiAiYmFyIn0=">>
+ }.
+
sessions_count(get, #{query_string := QString}) ->
Since = maps:get(<<"since">>, QString, 0),
Count = emqx_cm_registry_keeper:count(Since),
diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl
index e4ad37e04..a007de829 100644
--- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl
+++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl
@@ -23,16 +23,23 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/asserts.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
all() ->
AllTCs = emqx_common_test_helpers:all(?MODULE),
[
- {group, persistent_sessions}
- | AllTCs -- persistent_session_testcases()
+ {group, persistent_sessions},
+ {group, msgs_base64_encoding},
+ {group, msgs_plain_encoding}
+ | AllTCs -- (persistent_session_testcases() ++ client_msgs_testcases())
].
groups() ->
- [{persistent_sessions, persistent_session_testcases()}].
+ [
+ {persistent_sessions, persistent_session_testcases()},
+ {msgs_base64_encoding, client_msgs_testcases()},
+ {msgs_plain_encoding, client_msgs_testcases()}
+ ].
persistent_session_testcases() ->
[
@@ -42,12 +49,19 @@ persistent_session_testcases() ->
t_persistent_sessions4,
t_persistent_sessions5
].
+client_msgs_testcases() ->
+ [
+ t_inflight_messages,
+ t_mqueue_messages
+ ].
init_per_suite(Config) ->
+ ok = snabbkaffe:start_trace(),
emqx_mgmt_api_test_util:init_suite(),
Config.
end_per_suite(_) ->
+ ok = snabbkaffe:stop(),
emqx_mgmt_api_test_util:end_suite().
init_per_group(persistent_sessions, Config) ->
@@ -67,6 +81,10 @@ init_per_group(persistent_sessions, Config) ->
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{nodes, Nodes} | Config];
+init_per_group(msgs_base64_encoding, Config) ->
+ [{payload_encoding, base64} | Config];
+init_per_group(msgs_plain_encoding, Config) ->
+ [{payload_encoding, plain} | Config];
init_per_group(_Group, Config) ->
Config.
@@ -77,6 +95,21 @@ end_per_group(persistent_sessions, Config) ->
end_per_group(_Group, _Config) ->
ok.
+end_per_testcase(TC, _Config) when
+ TC =:= t_inflight_messages;
+ TC =:= t_mqueue_messages
+->
+ ClientId = atom_to_binary(TC),
+ lists:foreach(fun(P) -> exit(P, kill) end, emqx_cm:lookup_channels(local, ClientId)),
+ ok = emqx_common_test_helpers:wait_for(
+ ?FUNCTION_NAME,
+ ?LINE,
+ fun() -> [] =:= emqx_cm:lookup_channels(local, ClientId) end,
+ 5000
+ );
+end_per_testcase(_TC, _Config) ->
+ ok.
+
t_clients(_) ->
process_flag(trap_exit, true),
@@ -759,8 +792,206 @@ t_client_id_not_found(_Config) ->
?assertMatch({error, {Http, _, Body}}, PostFun(post, PathFun(["unsubscribe"]), UnsubBody)),
?assertMatch(
{error, {Http, _, Body}}, PostFun(post, PathFun(["unsubscribe", "bulk"]), [UnsubBody])
+ ),
+ %% Mqueue messages
+ ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["mqueue_messages"]))),
+ %% Inflight messages
+ ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["inflight_messages"]))).
+
+t_mqueue_messages(Config) ->
+ ClientId = atom_to_binary(?FUNCTION_NAME),
+ Topic = <<"t/test_mqueue_msgs">>,
+ Count = emqx_mgmt:default_row_limit(),
+ {ok, _Client} = client_with_mqueue(ClientId, Topic, Count),
+ Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]),
+ ?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])),
+ AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+ test_messages(Path, Topic, Count, AuthHeader, ?config(payload_encoding, Config)),
+
+ ?assertMatch(
+ {error, {_, 400, _}},
+ emqx_mgmt_api_test_util:request_api(
+ get, Path, "limit=10&after=not-base64%23%21", AuthHeader
+ )
+ ),
+ ?assertMatch(
+ {error, {_, 400, _}},
+ emqx_mgmt_api_test_util:request_api(
+ get, Path, "limit=-5&after=not-base64%23%21", AuthHeader
+ )
).
+t_inflight_messages(Config) ->
+ ClientId = atom_to_binary(?FUNCTION_NAME),
+ Topic = <<"t/test_inflight_msgs">>,
+ PubCount = emqx_mgmt:default_row_limit(),
+ {ok, Client} = client_with_inflight(ClientId, Topic, PubCount),
+ Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "inflight_messages"]),
+ InflightLimit = emqx:get_config([mqtt, max_inflight]),
+ AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+ test_messages(Path, Topic, InflightLimit, AuthHeader, ?config(payload_encoding, Config)),
+
+ ?assertMatch(
+ {error, {_, 400, _}},
+ emqx_mgmt_api_test_util:request_api(
+ get, Path, "limit=10&after=not-int", AuthHeader
+ )
+ ),
+ ?assertMatch(
+ {error, {_, 400, _}},
+ emqx_mgmt_api_test_util:request_api(
+ get, Path, "limit=-5&after=invalid-int", AuthHeader
+ )
+ ),
+ emqtt:stop(Client).
+
+client_with_mqueue(ClientId, Topic, Count) ->
+ {ok, Client} = emqtt:start_link([
+ {proto_ver, v5},
+ {clientid, ClientId},
+ {clean_start, false},
+ {properties, #{'Session-Expiry-Interval' => 120}}
+ ]),
+ {ok, _} = emqtt:connect(Client),
+ {ok, _, _} = emqtt:subscribe(Client, Topic, 1),
+ ok = emqtt:disconnect(Client),
+ publish_msgs(Topic, Count),
+ {ok, Client}.
+
+client_with_inflight(ClientId, Topic, Count) ->
+ {ok, Client} = emqtt:start_link([
+ {proto_ver, v5},
+ {clientid, ClientId},
+ {clean_start, true},
+ {auto_ack, never}
+ ]),
+ {ok, _} = emqtt:connect(Client),
+ {ok, _, _} = emqtt:subscribe(Client, Topic, 1),
+ publish_msgs(Topic, Count),
+ {ok, Client}.
+
+publish_msgs(Topic, Count) ->
+ lists:foreach(
+ fun(Seq) ->
+ emqx_broker:publish(emqx_message:make(undefined, ?QOS_1, Topic, integer_to_binary(Seq)))
+ end,
+ lists:seq(1, Count)
+ ).
+
+test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) ->
+ Qs0 = io_lib:format("payload=~s", [PayloadEncoding]),
+ {ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader),
+ #{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp),
+
+ ?assertMatch(
+ #{
+ <<"last">> := <<"end_of_data">>,
+ <<"count">> := Count
+ },
+ Meta
+ ),
+ ?assertEqual(length(Msgs), Count),
+ lists:foreach(
+ fun({Seq, #{<<"payload">> := P} = M}) ->
+ ?assertEqual(Seq, binary_to_integer(decode_payload(P, PayloadEncoding))),
+ ?assertMatch(
+ #{
+ <<"msgid">> := _,
+ <<"topic">> := Topic,
+ <<"qos">> := _,
+ <<"publish_at">> := _,
+ <<"from_clientid">> := _,
+ <<"from_username">> := _
+ },
+ M
+ )
+ end,
+ lists:zip(lists:seq(1, Count), Msgs)
+ ),
+
+ %% The first message payload is <<"1">>,
+ %% and when it is urlsafe base64 encoded (with no padding), it's <<"MQ">>,
+ %% so we cover both cases:
+ %% - when total payload size exceeds the limit,
+ %% - when the first message payload already exceeds the limit but is still returned in the response.
+ QsPayloadLimit = io_lib:format("payload=~s&max_payload_bytes=1", [PayloadEncoding]),
+ {ok, LimitedMsgsResp} = emqx_mgmt_api_test_util:request_api(
+ get, Path, QsPayloadLimit, AuthHeader
+ ),
+ #{<<"meta">> := _, <<"data">> := FirstMsgOnly} = emqx_utils_json:decode(LimitedMsgsResp),
+ ct:pal("~p", [FirstMsgOnly]),
+ ?assertEqual(1, length(FirstMsgOnly)),
+ ?assertEqual(
+ <<"1">>, decode_payload(maps:get(<<"payload">>, hd(FirstMsgOnly)), PayloadEncoding)
+ ),
+
+ Limit = 19,
+ LastCont = lists:foldl(
+ fun(PageSeq, Cont) ->
+ Qs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, Cont, Limit]),
+ {ok, MsgsRespP} = emqx_mgmt_api_test_util:request_api(get, Path, Qs, AuthHeader),
+ #{
+ <<"meta">> := #{<<"last">> := NextCont} = MetaP,
+ <<"data">> := MsgsP
+ } = emqx_utils_json:decode(MsgsRespP),
+ ?assertMatch(#{<<"count">> := Count}, MetaP),
+ ?assertNotEqual(<<"end_of_data">>, NextCont),
+ ?assertEqual(length(MsgsP), Limit),
+ ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1),
+ ExpLastPayload = integer_to_binary(PageSeq * Limit),
+ ?assertEqual(
+ ExpFirstPayload, decode_payload(maps:get(<<"payload">>, hd(MsgsP)), PayloadEncoding)
+ ),
+ ?assertEqual(
+ ExpLastPayload,
+ decode_payload(maps:get(<<"payload">>, lists:last(MsgsP)), PayloadEncoding)
+ ),
+ NextCont
+ end,
+ none,
+ lists:seq(1, Count div 19)
+ ),
+ LastPartialPage = Count div 19 + 1,
+ LastQs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, LastCont, Limit]),
+ {ok, MsgsRespLastP} = emqx_mgmt_api_test_util:request_api(get, Path, LastQs, AuthHeader),
+ #{<<"meta">> := #{<<"last">> := EmptyCont} = MetaLastP, <<"data">> := MsgsLastP} = emqx_utils_json:decode(
+ MsgsRespLastP
+ ),
+ ?assertEqual(<<"end_of_data">>, EmptyCont),
+ ?assertMatch(#{<<"count">> := Count}, MetaLastP),
+
+ ?assertEqual(
+ integer_to_binary(LastPartialPage * Limit - Limit + 1),
+ decode_payload(maps:get(<<"payload">>, hd(MsgsLastP)), PayloadEncoding)
+ ),
+ ?assertEqual(
+ integer_to_binary(Count),
+ decode_payload(maps:get(<<"payload">>, lists:last(MsgsLastP)), PayloadEncoding)
+ ),
+
+ ExceedQs = io_lib:format("payload=~s&after=~s&limit=~p", [
+ PayloadEncoding, EmptyCont, Limit
+ ]),
+ ?assertMatch(
+ {error, {_, 400, _}},
+ emqx_mgmt_api_test_util:request_api(get, Path, ExceedQs, AuthHeader)
+ ),
+
+ %% Invalid common page params
+ ?assertMatch(
+ {error, {_, 400, _}},
+ emqx_mgmt_api_test_util:request_api(get, Path, "limit=0", AuthHeader)
+ ),
+ ?assertMatch(
+ {error, {_, 400, _}},
+ emqx_mgmt_api_test_util:request_api(get, Path, "limit=limit", AuthHeader)
+ ).
+
+decode_payload(Payload, base64) ->
+ base64:decode(Payload);
+decode_payload(Payload, _) ->
+ Payload.
+
t_subscribe_shared_topic(_Config) ->
ClientId = <<"client_subscribe_shared">>,
diff --git a/changes/ce/feat-12561.en.md b/changes/ce/feat-12561.en.md
new file mode 100644
index 000000000..072a71373
--- /dev/null
+++ b/changes/ce/feat-12561.en.md
@@ -0,0 +1,21 @@
+Implement HTTP APIs to get the list of client's inflight and mqueue messages.
+
+To get the first chunk of data:
+ - GET /clients/{clientid}/mqueue_messages?limit=100
+ - GET /clients/{clientid}/inflight_messages?limit=100
+
+Alternatively:
+ - GET /clients/{clientid}/mqueue_messages?limit=100&after=none
+ - GET /clients/{clientid}/inflight_messages?limit=100&after=none
+
+To get the next chunk of data:
+ - GET /clients/{clientid}/mqueue_messages?limit=100&after={last}
+ - GET /clients/{clientid}/inflight_messages?limit=100&after={last}
+
+ Where {last} is a value (opaque string token) of "meta.last" field from the previous response.
+
+ If there is no more data, "last" = "end_of_data" is returned.
+ If a subsequent request is attempted with "after=end_of_data", a "400 Bad Request" error response will be received.
+
+Mqueue messages are ordered according to the queue (FIFO) order.
+Inflight messages are ordered by MQTT Packet Id, which may not represent the chronological messages order.
diff --git a/rel/i18n/emqx_mgmt_api_clients.hocon b/rel/i18n/emqx_mgmt_api_clients.hocon
index 2431c09ec..3175715e0 100644
--- a/rel/i18n/emqx_mgmt_api_clients.hocon
+++ b/rel/i18n/emqx_mgmt_api_clients.hocon
@@ -35,6 +35,57 @@ get_client_subs.desc:
get_client_subs.label:
"""Get client subscriptions"""
+get_client_mqueue_msgs.desc:
+"""Get client mqueue messages"""
+get_client_mqueue_msgs.label:
+"""Get client mqueue messages"""
+
+get_client_inflight_msgs.desc:
+"""Get client inflight messages"""
+get_client_inflight_msgs.label:
+"""Get client inflight messages"""
+
+mqueue_msgs_list.desc:
+"""Client's mqueue messages list. The queue (FIFO) ordering is preserved."""
+mqueue_msgs_list.label:
+"""Client's mqueue messages"""
+
+inflight_msgs_list.desc:
+"""Client's inflight messages list.
+Ordered by MQTT Packet Id, which may not represent the chronological messages order."""
+inflight_msgs_list.label:
+"""Client's inflight messages"""
+
+msg_id.desc:
+"""Message ID."""
+msg_id.label:
+"""Message ID"""
+
+msg_topic.desc:
+"""Message topic."""
+msg_topic.label:
+"""Message Topic"""
+
+msg_qos.desc:
+"""Message QoS."""
+msg_topic.label:
+"""Message Qos"""
+
+msg_publish_at.desc:
+"""Message publish time, a millisecond precision Unix epoch timestamp."""
+msg_publish_at.label:
+"""Message Publish Time."""
+
+msg_from_clientid.desc:
+"""Message publisher's client ID."""
+msg_from_clientid.desc:
+"""Message publisher's Client ID"""
+
+msg_from_username.desc:
+"""Message publisher's username."""
+msg_from_username.label:
+"""Message Publisher's Username """
+
subscribe.desc:
"""Subscribe"""
subscribe.label: