From a2e761681e5442197b6afb977ad5b686dc88ef50 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Wed, 21 Feb 2024 16:49:17 +0200 Subject: [PATCH] feat: add client mqueue/inflight messages API --- apps/emqx/src/emqx_channel.erl | 4 + apps/emqx/src/emqx_inflight.erl | 47 +++- apps/emqx/src/emqx_mqueue.erl | 52 +++- apps/emqx/src/emqx_session.erl | 2 +- apps/emqx/src/emqx_session_mem.erl | 5 + apps/emqx/test/emqx_inflight_SUITE.erl | 82 +++++- apps/emqx/test/emqx_mqueue_SUITE.erl | 68 +++++ .../src/emqx_dashboard_swagger.erl | 30 ++- apps/emqx_management/src/emqx_mgmt.erl | 7 + apps/emqx_management/src/emqx_mgmt_api.erl | 50 ++++ .../src/emqx_mgmt_api_clients.erl | 221 +++++++++++++++- .../test/emqx_mgmt_api_clients_SUITE.erl | 237 +++++++++++++++++- changes/ce/feat-12561.en.md | 21 ++ rel/i18n/emqx_mgmt_api_clients.hocon | 51 ++++ 14 files changed, 866 insertions(+), 11 deletions(-) create mode 100644 changes/ce/feat-12561.en.md diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 51b66f4f9..94497ef46 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1210,6 +1210,10 @@ handle_call( ChanInfo1 = info(NChannel), emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}), reply(ok, reset_timer(keepalive, NChannel)); +handle_call({Type, _Meta} = MsgsReq, Channel = #channel{session = Session}) when + Type =:= mqueue_msgs; Type =:= inflight_msgs +-> + {reply, emqx_session:info(MsgsReq, Session), Channel}; handle_call(Req, Channel) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), reply(ignored, Channel). diff --git a/apps/emqx/src/emqx_inflight.erl b/apps/emqx/src/emqx_inflight.erl index c342a846f..1f4433e57 100644 --- a/apps/emqx/src/emqx_inflight.erl +++ b/apps/emqx/src/emqx_inflight.erl @@ -36,7 +36,8 @@ max_size/1, is_full/1, is_empty/1, - window/1 + window/1, + query/2 ]). -export_type([inflight/0]). @@ -138,3 +139,47 @@ size(?INFLIGHT(Tree)) -> -spec max_size(inflight()) -> non_neg_integer(). max_size(?INFLIGHT(MaxSize, _Tree)) -> MaxSize. + +-spec query(inflight(), #{continuation => Cont, limit := L}) -> + {[{key(), term()}], #{continuation := Cont, count := C}} +when + Cont :: none | end_of_data | key(), + L :: non_neg_integer(), + C :: non_neg_integer(). +query(?INFLIGHT(Tree), #{limit := Limit} = Pager) -> + Count = gb_trees:size(Tree), + ContKey = maps:get(continuation, Pager, none), + {List, NextCont} = sublist(iterator_from(ContKey, Tree), Limit), + {List, #{continuation => NextCont, count => Count}}. + +iterator_from(none, Tree) -> + gb_trees:iterator(Tree); +iterator_from(ContKey, Tree) -> + It = gb_trees:iterator_from(ContKey, Tree), + case gb_trees:next(It) of + {ContKey, _Val, ItNext} -> ItNext; + _ -> It + end. + +sublist(_It, 0) -> + {[], none}; +sublist(It, Len) -> + {ListAcc, HasNext} = sublist(It, Len, []), + {lists:reverse(ListAcc), next_cont(ListAcc, HasNext)}. + +sublist(It, 0, Acc) -> + {Acc, gb_trees:next(It) =/= none}; +sublist(It, Len, Acc) -> + case gb_trees:next(It) of + none -> + {Acc, false}; + {Key, Val, ItNext} -> + sublist(ItNext, Len - 1, [{Key, Val} | Acc]) + end. + +next_cont(_Acc, false) -> + end_of_data; +next_cont([{LastKey, _LastVal} | _Acc], _HasNext) -> + LastKey; +next_cont([], _HasNext) -> + end_of_data. diff --git a/apps/emqx/src/emqx_mqueue.erl b/apps/emqx/src/emqx_mqueue.erl index d085a196b..e3e54cdc9 100644 --- a/apps/emqx/src/emqx_mqueue.erl +++ b/apps/emqx/src/emqx_mqueue.erl @@ -68,7 +68,8 @@ stats/1, dropped/1, to_list/1, - filter/2 + filter/2, + query/2 ]). -define(NO_PRIORITY_TABLE, disabled). @@ -171,6 +172,55 @@ filter(Pred, #mqueue{q = Q, len = Len, dropped = Droppend} = MQ) -> MQ#mqueue{q = Q2, len = Len2, dropped = Droppend + Diff} end. +-spec query(mqueue(), #{continuation => ContMsgId, limit := L}) -> + {[message()], #{continuation := ContMsgId, count := C}} +when + ContMsgId :: none | end_of_data | binary(), + C :: non_neg_integer(), + L :: non_neg_integer(). +query(MQ, #{limit := Limit} = Pager) -> + ContMsgId = maps:get(continuation, Pager, none), + {List, NextCont} = sublist(skip_until(MQ, ContMsgId), Limit), + {List, #{continuation => NextCont, count => len(MQ)}}. + +skip_until(MQ, none = _MsgId) -> + MQ; +skip_until(MQ, MsgId) -> + do_skip_until(MQ, MsgId). + +do_skip_until(MQ, MsgId) -> + case out(MQ) of + {empty, MQ} -> + MQ; + {{value, #message{id = MsgId}}, Q1} -> + Q1; + {{value, _Msg}, Q1} -> + do_skip_until(Q1, MsgId) + end. + +sublist(_MQ, 0) -> + {[], none}; +sublist(MQ, Len) -> + {ListAcc, HasNext} = sublist(MQ, Len, []), + {lists:reverse(ListAcc), next_cont(ListAcc, HasNext)}. + +sublist(MQ, 0, Acc) -> + {Acc, element(1, out(MQ)) =/= empty}; +sublist(MQ, Len, Acc) -> + case out(MQ) of + {empty, _MQ} -> + {Acc, false}; + {{value, Msg}, Q1} -> + sublist(Q1, Len - 1, [Msg | Acc]) + end. + +next_cont(_Acc, false) -> + end_of_data; +next_cont([#message{id = Id} | _Acc], _HasNext) -> + Id; +next_cont([], _HasNext) -> + end_of_data. + to_list(MQ, Acc) -> case out(MQ) of {empty, _MQ} -> diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index a84ed4d83..de9af5388 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -527,7 +527,7 @@ info(Session) -> -spec info ([atom()], t()) -> [{atom(), _Value}]; - (atom(), t()) -> _Value. + (atom() | {atom(), _Meta}, t()) -> _Value. info(Keys, Session) when is_list(Keys) -> [{Key, info(Key, Session)} || Key <- Keys]; info(impl, Session) -> diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index e5e60583f..dbb440f41 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -268,6 +268,9 @@ info(inflight_cnt, #session{inflight = Inflight}) -> emqx_inflight:size(Inflight); info(inflight_max, #session{inflight = Inflight}) -> emqx_inflight:max_size(Inflight); +info({inflight_msgs, PagerParams}, #session{inflight = Inflight}) -> + {InflightList, Meta} = emqx_inflight:query(Inflight, PagerParams), + {[I#inflight_data.message || {_, I} <- InflightList], Meta}; info(retry_interval, #session{retry_interval = Interval}) -> Interval; info(mqueue, #session{mqueue = MQueue}) -> @@ -278,6 +281,8 @@ info(mqueue_max, #session{mqueue = MQueue}) -> emqx_mqueue:max_len(MQueue); info(mqueue_dropped, #session{mqueue = MQueue}) -> emqx_mqueue:dropped(MQueue); +info({mqueue_msgs, PagerParams}, #session{mqueue = MQueue}) -> + emqx_mqueue:query(MQueue, PagerParams); info(next_pkt_id, #session{next_pkt_id = PacketId}) -> PacketId; info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) -> diff --git a/apps/emqx/test/emqx_inflight_SUITE.erl b/apps/emqx/test/emqx_inflight_SUITE.erl index c3b7ca6fc..a220129af 100644 --- a/apps/emqx/test/emqx_inflight_SUITE.erl +++ b/apps/emqx/test/emqx_inflight_SUITE.erl @@ -116,5 +116,83 @@ t_window(_) -> ), ?assertEqual([a, b], emqx_inflight:window(Inflight)). -% t_to_list(_) -> -% error('TODO'). +t_to_list(_) -> + Inflight = lists:foldl( + fun(Seq, InflightAcc) -> + emqx_inflight:insert(Seq, integer_to_binary(Seq), InflightAcc) + end, + emqx_inflight:new(100), + [1, 6, 2, 3, 10, 7, 9, 8, 4, 5] + ), + ExpList = [{Seq, integer_to_binary(Seq)} || Seq <- lists:seq(1, 10)], + ?assertEqual(ExpList, emqx_inflight:to_list(Inflight)). + +t_query(_) -> + EmptyInflight = emqx_inflight:new(500), + ?assertMatch( + {[], #{continuation := end_of_data}}, emqx_inflight:query(EmptyInflight, #{limit => 50}) + ), + ?assertMatch( + {[], #{continuation := end_of_data}}, + emqx_inflight:query(EmptyInflight, #{continuation => <<"empty">>, limit => 50}) + ), + ?assertMatch( + {[], #{continuation := end_of_data}}, + emqx_inflight:query(EmptyInflight, #{continuation => none, limit => 50}) + ), + + Inflight = lists:foldl( + fun(Seq, QAcc) -> + emqx_inflight:insert(Seq, integer_to_binary(Seq), QAcc) + end, + EmptyInflight, + lists:reverse(lists:seq(1, 114)) + ), + + LastCont = lists:foldl( + fun(PageSeq, Cont) -> + Limit = 10, + PagerParams = #{continuation => Cont, limit => Limit}, + {Page, #{continuation := NextCont} = Meta} = emqx_inflight:query(Inflight, PagerParams), + ?assertEqual(10, length(Page)), + ExpFirst = PageSeq * Limit - Limit + 1, + ExpLast = PageSeq * Limit, + ?assertEqual({ExpFirst, integer_to_binary(ExpFirst)}, lists:nth(1, Page)), + ?assertEqual({ExpLast, integer_to_binary(ExpLast)}, lists:nth(10, Page)), + ?assertMatch( + #{count := 114, continuation := IntCont} when is_integer(IntCont), + Meta + ), + NextCont + end, + none, + lists:seq(1, 11) + ), + {LastPartialPage, LastMeta} = emqx_inflight:query(Inflight, #{ + continuation => LastCont, limit => 10 + }), + ?assertEqual(4, length(LastPartialPage)), + ?assertEqual({111, <<"111">>}, lists:nth(1, LastPartialPage)), + ?assertEqual({114, <<"114">>}, lists:nth(4, LastPartialPage)), + ?assertMatch(#{continuation := end_of_data, count := 114}, LastMeta), + + ?assertMatch( + {[], #{continuation := end_of_data}}, + emqx_inflight:query(Inflight, #{continuation => <<"not-existing-cont-id">>, limit => 10}) + ), + + {LargePage, LargeMeta} = emqx_inflight:query(Inflight, #{limit => 1000}), + ?assertEqual(114, length(LargePage)), + ?assertEqual({1, <<"1">>}, hd(LargePage)), + ?assertEqual({114, <<"114">>}, lists:last(LargePage)), + ?assertMatch(#{continuation := end_of_data}, LargeMeta), + + {FullPage, FullMeta} = emqx_inflight:query(Inflight, #{limit => 114}), + ?assertEqual(114, length(FullPage)), + ?assertEqual({1, <<"1">>}, hd(FullPage)), + ?assertEqual({114, <<"114">>}, lists:last(FullPage)), + ?assertMatch(#{continuation := end_of_data}, FullMeta), + + {EmptyPage, EmptyMeta} = emqx_inflight:query(Inflight, #{limit => 0}), + ?assertEqual([], EmptyPage), + ?assertMatch(#{continuation := none, count := 114}, EmptyMeta). diff --git a/apps/emqx/test/emqx_mqueue_SUITE.erl b/apps/emqx/test/emqx_mqueue_SUITE.erl index 51db4b98a..f3e1629a7 100644 --- a/apps/emqx/test/emqx_mqueue_SUITE.erl +++ b/apps/emqx/test/emqx_mqueue_SUITE.erl @@ -282,6 +282,74 @@ t_dropped(_) -> {Msg, Q2} = ?Q:in(Msg, Q1), ?assertEqual(1, ?Q:dropped(Q2)). +t_query(_) -> + EmptyQ = ?Q:init(#{max_len => 500, store_qos0 => true}), + ?assertMatch({[], #{continuation := end_of_data}}, ?Q:query(EmptyQ, #{limit => 50})), + ?assertMatch( + {[], #{continuation := end_of_data}}, + ?Q:query(EmptyQ, #{continuation => <<"empty">>, limit => 50}) + ), + ?assertMatch( + {[], #{continuation := end_of_data}}, ?Q:query(EmptyQ, #{continuation => none, limit => 50}) + ), + + Q = lists:foldl( + fun(Seq, QAcc) -> + Msg = emqx_message:make(<<"t">>, integer_to_binary(Seq)), + {_, QAcc1} = ?Q:in(Msg, QAcc), + QAcc1 + end, + EmptyQ, + lists:seq(1, 114) + ), + + LastCont = lists:foldl( + fun(PageSeq, Cont) -> + Limit = 10, + PagerParams = #{continuation => Cont, limit => Limit}, + {Page, #{continuation := NextCont} = Meta} = ?Q:query(Q, PagerParams), + ?assertEqual(10, length(Page)), + ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1), + ExpLastPayload = integer_to_binary(PageSeq * Limit), + ?assertEqual( + ExpFirstPayload, + emqx_message:payload(lists:nth(1, Page)), + #{page_seq => PageSeq, page => Page, meta => Meta} + ), + ?assertEqual(ExpLastPayload, emqx_message:payload(lists:nth(10, Page))), + ?assertMatch(#{count := 114, continuation := <<_/binary>>}, Meta), + NextCont + end, + none, + lists:seq(1, 11) + ), + {LastPartialPage, LastMeta} = ?Q:query(Q, #{continuation => LastCont, limit => 10}), + ?assertEqual(4, length(LastPartialPage)), + ?assertEqual(<<"111">>, emqx_message:payload(lists:nth(1, LastPartialPage))), + ?assertEqual(<<"114">>, emqx_message:payload(lists:nth(4, LastPartialPage))), + ?assertMatch(#{continuation := end_of_data, count := 114}, LastMeta), + + ?assertMatch( + {[], #{continuation := end_of_data}}, + ?Q:query(Q, #{continuation => <<"not-existing-cont-id">>, limit => 10}) + ), + + {LargePage, LargeMeta} = ?Q:query(Q, #{limit => 1000}), + ?assertEqual(114, length(LargePage)), + ?assertEqual(<<"1">>, emqx_message:payload(hd(LargePage))), + ?assertEqual(<<"114">>, emqx_message:payload(lists:last(LargePage))), + ?assertMatch(#{continuation := end_of_data}, LargeMeta), + + {FullPage, FullMeta} = ?Q:query(Q, #{limit => 114}), + ?assertEqual(114, length(FullPage)), + ?assertEqual(<<"1">>, emqx_message:payload(hd(FullPage))), + ?assertEqual(<<"114">>, emqx_message:payload(lists:last(FullPage))), + ?assertMatch(#{continuation := end_of_data}, FullMeta), + + {EmptyPage, EmptyMeta} = ?Q:query(Q, #{limit => 0}), + ?assertEqual([], EmptyPage), + ?assertMatch(#{continuation := none, count := 114}, EmptyMeta). + conservation_prop() -> ?FORALL( {Priorities, Messages}, diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 27b1ef2fc..719f62690 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -178,8 +178,36 @@ fields(hasnext) -> >>, Meta = #{desc => Desc, required => true}, [{hasnext, hoconsc:mk(boolean(), Meta)}]; +fields('after') -> + Desc = << + "The value of \"last\" field returned in the previous response. It can then be used" + " in subsequent requests to get the next chunk of results.
" + "It is used instead of \"page\" parameter to traverse volatile data.
" + "Can be omitted or set to \"none\" to get the first chunk of data.
" + "\last\" = end_of_data\" is returned, if there is no more data.
" + "Sending \"after=end_of_table\" back to the server will result in \"400 Bad Request\"" + " error response." + >>, + Meta = #{ + in => query, desc => Desc, required => false, example => <<"AAYS53qRa0n07AAABFIACg">> + }, + [{'after', hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}]; +fields(last) -> + Desc = << + "An opaque token that can then be in subsequent requests to get " + " the next chunk of results: \"?after={last}\"
" + "if there is no more data, \"last\" = end_of_data\" is returned.
" + "Sending \"after=end_of_table\" back to the server will result in \"400 Bad Request\"" + " error response." + >>, + Meta = #{ + desc => Desc, required => true, example => <<"AAYS53qRa0n07AAABFIACg">> + }, + [{last, hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}]; fields(meta) -> - fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext). + fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext); +fields(continuation_meta) -> + fields(last) ++ fields(count). -spec schema_with_example(hocon_schema:type(), term()) -> hocon_schema:field_schema(). schema_with_example(Type, Example) -> diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index a1ab0bc3f..e470805d8 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -52,6 +52,7 @@ kickout_clients/1, list_authz_cache/1, list_client_subscriptions/1, + list_client_msgs/3, client_subscriptions/2, clean_authz_cache/1, clean_authz_cache/2, @@ -417,6 +418,12 @@ list_client_subscriptions_mem(ClientId) -> end end. +list_client_msgs(MsgsType, ClientId, PagerParams) when + MsgsType =:= inflight_msgs; + MsgsType =:= mqueue_msgs +-> + call_client(ClientId, {MsgsType, PagerParams}). + client_subscriptions(Node, ClientId) -> {Node, unwrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId))}. diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index be8f24bc3..bd3e5723c 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -22,6 +22,8 @@ -define(LONG_QUERY_TIMEOUT, 50000). +-define(CONT_BASE64_OPTS, #{mode => urlsafe, padding => false}). + -export([ paginate/3 ]). @@ -37,6 +39,8 @@ -export([ parse_pager_params/1, + parse_cont_pager_params/2, + encode_cont_pager_params/2, parse_qstring/2, init_query_result/0, init_query_state/5, @@ -134,6 +138,33 @@ page(Params) -> limit(Params) when is_map(Params) -> maps:get(<<"limit">>, Params, emqx_mgmt:default_row_limit()). +continuation(Params, Encoding) -> + try + decode_continuation(maps:get(<<"after">>, Params, none), Encoding) + catch + _:_ -> + error + end. + +decode_continuation(none, _Encoding) -> + none; +decode_continuation(end_of_data, _Encoding) -> + %% Clients should not send "after=end_of_data" back to the server + error; +decode_continuation(Cont, none) -> + Cont; +decode_continuation(Cont, base64) -> + base64:decode(Cont, ?CONT_BASE64_OPTS). + +encode_continuation(none, _Encoding) -> + none; +encode_continuation(end_of_data, _Encoding) -> + end_of_data; +encode_continuation(Cont, none) -> + emqx_utils_conv:bin(Cont); +encode_continuation(Cont, base64) -> + base64:encode(emqx_utils_conv:bin(Cont), ?CONT_BASE64_OPTS). + %%-------------------------------------------------------------------- %% Node Query %%-------------------------------------------------------------------- @@ -632,6 +663,25 @@ parse_pager_params(Params) -> false end. +-spec parse_cont_pager_params(map(), none | base64) -> + #{limit := pos_integer(), continuation := none | end_of_table | binary()} | false. +parse_cont_pager_params(Params, Encoding) -> + Cont = continuation(Params, Encoding), + Limit = b2i(limit(Params)), + case Limit > 0 andalso Cont =/= error of + true -> + #{continuation => Cont, limit => Limit}; + false -> + false + end. + +-spec encode_cont_pager_params(map(), none | base64) -> map(). +encode_cont_pager_params(#{continuation := Cont} = Meta, ContEncoding) -> + Meta1 = maps:remove(continuation, Meta), + Meta1#{last => encode_continuation(Cont, ContEncoding)}; +encode_cont_pager_params(Meta, _ContEncoding) -> + Meta. + %%-------------------------------------------------------------------- %% Types %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index bc07d38bf..3555b5df6 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -22,8 +22,8 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_cm.hrl"). -include_lib("hocon/include/hoconsc.hrl"). - -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_utils/include/emqx_utils_api.hrl"). -include("emqx_mgmt.hrl"). @@ -47,7 +47,9 @@ unsubscribe/2, unsubscribe_batch/2, set_keepalive/2, - sessions_count/2 + sessions_count/2, + inflight_msgs/2, + mqueue_msgs/2 ]). -export([ @@ -101,6 +103,8 @@ paths() -> "/clients/:clientid/unsubscribe", "/clients/:clientid/unsubscribe/bulk", "/clients/:clientid/keepalive", + "/clients/:clientid/mqueue_messages", + "/clients/:clientid/inflight_messages", "/sessions_count" ]. @@ -391,6 +395,14 @@ schema("/clients/:clientid/keepalive") -> } } }; +schema("/clients/:clientid/mqueue_messages") -> + ContExample = <<"AAYS53qRa0n07AAABFIACg">>, + RespSchema = ?R_REF(mqueue_messages), + client_msgs_schema(mqueue_msgs, ?DESC(get_client_mqueue_msgs), ContExample, RespSchema); +schema("/clients/:clientid/inflight_messages") -> + ContExample = <<"10">>, + RespSchema = ?R_REF(inflight_messages), + client_msgs_schema(inflight_msgs, ?DESC(get_client_inflight_msgs), ContExample, RespSchema); schema("/sessions_count") -> #{ 'operationId' => sessions_count, @@ -621,6 +633,26 @@ fields(subscribe) -> fields(unsubscribe) -> [ {topic, hoconsc:mk(binary(), #{desc => <<"Topic">>, example => <<"testtopic/#">>})} + ]; +fields(mqueue_messages) -> + [ + {data, hoconsc:mk(hoconsc:array(?REF(message)), #{desc => ?DESC(mqueue_msgs_list)})}, + {meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, continuation_meta), #{})} + ]; +fields(inflight_messages) -> + [ + {data, hoconsc:mk(hoconsc:array(?REF(message)), #{desc => ?DESC(inflight_msgs_list)})}, + {meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, continuation_meta), #{})} + ]; +fields(message) -> + [ + {msgid, hoconsc:mk(binary(), #{desc => ?DESC(msg_id)})}, + {topic, hoconsc:mk(binary(), #{desc => ?DESC(msg_topic)})}, + {qos, hoconsc:mk(emqx_schema:qos(), #{desc => ?DESC(msg_qos)})}, + {publish_at, hoconsc:mk(integer(), #{desc => ?DESC(msg_publish_at)})}, + {from_clientid, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_clientid)})}, + {from_username, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_username)})}, + {payload, hoconsc:mk(binary(), #{desc => ?DESC(msg_payload)})} ]. %%%============================================================================================== @@ -693,6 +725,15 @@ set_keepalive(put, #{bindings := #{clientid := ClientID}, body := Body}) -> end end. +mqueue_msgs(get, #{bindings := #{clientid := ClientID}, query_string := QString}) -> + list_client_msgs(mqueue_msgs, ClientID, QString). + +inflight_msgs(get, #{ + bindings := #{clientid := ClientID}, + query_string := QString +}) -> + list_client_msgs(inflight_msgs, ClientID, QString). + %%%============================================================================================== %% api apply @@ -825,6 +866,62 @@ unsubscribe_batch(#{clientid := ClientID, topics := Topics}) -> %%-------------------------------------------------------------------- %% internal function +client_msgs_schema(OpId, Desc, ContExample, RespSchema) -> + #{ + 'operationId' => OpId, + get => #{ + description => Desc, + tags => ?TAGS, + parameters => client_msgs_params(), + responses => #{ + 200 => + emqx_dashboard_swagger:schema_with_example(RespSchema, #{ + <<"data">> => [message_example()], + <<"meta">> => #{ + <<"count">> => 100, + <<"last">> => ContExample + } + }), + 400 => + emqx_dashboard_swagger:error_codes( + ['INVALID_PARAMETER'], <<"Invalid parameters">> + ), + 404 => emqx_dashboard_swagger:error_codes( + ['CLIENTID_NOT_FOUND'], <<"Client ID not found">> + ) + } + } + }. + +client_msgs_params() -> + [ + {clientid, hoconsc:mk(binary(), #{in => path})}, + {payload, + hoconsc:mk(hoconsc:enum([none, base64, plain]), #{ + in => query, + default => base64, + desc => << + "Client's inflight/mqueue messages payload encoding." + " If set to `none`, no payload is returned in the response." + >> + })}, + {max_payload_bytes, + hoconsc:mk(emqx_schema:bytesize(), #{ + in => query, + default => <<"1MB">>, + desc => << + "Client's inflight/mqueue messages payload limit." + " The total payload size of all messages in the response will not exceed this value." + " Messages beyond the limit will be silently omitted in the response." + " The only exception to this rule is when the first message payload" + " is already larger than the limit." + " In this case, the first message will be returned in the response." + >> + })}, + hoconsc:ref(emqx_dashboard_swagger, 'after'), + hoconsc:ref(emqx_dashboard_swagger, limit) + ]. + do_subscribe(ClientID, Topic0, Options) -> try emqx_topic:parse(Topic0, Options) of {Topic, Opts} -> @@ -1037,6 +1134,42 @@ remove_live_sessions(Rows) -> Rows ). +list_client_msgs(MsgType, ClientID, QString) -> + case parse_cont_pager_params(QString, MsgType) of + false -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"after_limit_invalid">>}}; + PagerParams = #{} -> + case emqx_mgmt:list_client_msgs(MsgType, ClientID, PagerParams) of + {error, not_found} -> + {404, ?CLIENTID_NOT_FOUND}; + {Msgs, Meta = #{}} when is_list(Msgs) -> + format_msgs_resp(MsgType, Msgs, Meta, QString) + end + end. + +parse_cont_pager_params(QString, MsgType) -> + case emqx_mgmt_api:parse_cont_pager_params(QString, cont_encoding(MsgType)) of + false -> + false; + PagerParams -> + maybe_cast_cont(MsgType, PagerParams) + end. + +maybe_cast_cont(inflight_msgs, #{continuation := Cont} = PagerParams) when is_binary(Cont) -> + try + PagerParams#{continuation => emqx_utils_conv:int(Cont)} + catch + _:_ -> + false + end; +maybe_cast_cont(_, PagerParams) -> + PagerParams. + +%% integer packet id +cont_encoding(inflight_msgs) -> none; +%% binary message id +cont_encoding(mqueue_msgs) -> base64. + %%-------------------------------------------------------------------- %% QueryString to Match Spec @@ -1197,6 +1330,79 @@ format_persistent_session_info(ClientId, PSInfo0) -> ), result_format_undefined_to_null(PSInfo). +format_msgs_resp(MsgType, Msgs, Meta, QString) -> + #{ + <<"payload">> := PayloadFmt, + <<"max_payload_bytes">> := MaxBytes + } = QString, + Meta1 = emqx_mgmt_api:encode_cont_pager_params(Meta, cont_encoding(MsgType)), + Resp = #{meta => Meta1, data => format_msgs(Msgs, PayloadFmt, MaxBytes)}, + %% Make sure minirest won't set another content-type for self-encoded JSON response body + Headers = #{<<"content-type">> => <<"application/json">>}, + case emqx_utils_json:safe_encode(Resp) of + {ok, RespBin} -> + {200, Headers, RespBin}; + _Error when PayloadFmt =:= plain -> + ?BAD_REQUEST( + <<"INVALID_PARAMETER">>, + <<"Some message payloads are not JSON serializable">> + ); + %% Unexpected internal error + Error -> + ?INTERNAL_ERROR(Error) + end. + +format_msgs([FirstMsg | Msgs], PayloadFmt, MaxBytes) -> + %% Always include at least one message payload, even if it exceeds the limit + {FirstMsg1, PayloadSize0} = format_msg(FirstMsg, PayloadFmt), + {Msgs1, _} = + catch lists:foldl( + fun(Msg, {MsgsAcc, SizeAcc} = Acc) -> + {Msg1, PayloadSize} = format_msg(Msg, PayloadFmt), + case SizeAcc + PayloadSize of + SizeAcc1 when SizeAcc1 =< MaxBytes -> + {[Msg1 | MsgsAcc], SizeAcc1}; + _ -> + throw(Acc) + end + end, + {[FirstMsg1], PayloadSize0}, + Msgs + ), + lists:reverse(Msgs1); +format_msgs([], _PayloadFmt, _MaxBytes) -> + []. + +format_msg( + #message{ + id = ID, + qos = Qos, + topic = Topic, + from = From, + timestamp = Timestamp, + headers = Headers, + payload = Payload + }, + PayloadFmt +) -> + Msg = #{ + msgid => emqx_guid:to_hexstr(ID), + qos => Qos, + topic => Topic, + publish_at => Timestamp, + from_clientid => emqx_utils_conv:bin(From), + from_username => maps:get(username, Headers, <<>>) + }, + format_payload(PayloadFmt, Msg, Payload). + +format_payload(none, Msg, _Payload) -> + {Msg, 0}; +format_payload(base64, Msg, Payload) -> + Payload1 = base64:encode(Payload), + {Msg#{payload => Payload1}, erlang:byte_size(Payload1)}; +format_payload(plain, Msg, Payload) -> + {Msg#{payload => Payload}, erlang:iolist_size(Payload)}. + %% format func helpers take_maps_from_inner(_Key, Value, Current) when is_map(Value) -> maps:merge(Current, Value); @@ -1298,6 +1504,17 @@ client_example() -> <<"recv_msg.qos0">> => 0 }. +message_example() -> + #{ + <<"msgid">> => <<"000611F460D57FA9F44500000D360002">>, + <<"topic">> => <<"t/test">>, + <<"qos">> => 0, + <<"publish_at">> => 1709055346487, + <<"from_clientid">> => <<"mqttx_59ac0a87">>, + <<"from_username">> => <<"test-user">>, + <<"payload">> => <<"eyJmb28iOiAiYmFyIn0=">> + }. + sessions_count(get, #{query_string := QString}) -> Since = maps:get(<<"since">>, QString, 0), Count = emqx_cm_registry_keeper:count(Since), diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index e4ad37e04..a007de829 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -23,16 +23,23 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/asserts.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). all() -> AllTCs = emqx_common_test_helpers:all(?MODULE), [ - {group, persistent_sessions} - | AllTCs -- persistent_session_testcases() + {group, persistent_sessions}, + {group, msgs_base64_encoding}, + {group, msgs_plain_encoding} + | AllTCs -- (persistent_session_testcases() ++ client_msgs_testcases()) ]. groups() -> - [{persistent_sessions, persistent_session_testcases()}]. + [ + {persistent_sessions, persistent_session_testcases()}, + {msgs_base64_encoding, client_msgs_testcases()}, + {msgs_plain_encoding, client_msgs_testcases()} + ]. persistent_session_testcases() -> [ @@ -42,12 +49,19 @@ persistent_session_testcases() -> t_persistent_sessions4, t_persistent_sessions5 ]. +client_msgs_testcases() -> + [ + t_inflight_messages, + t_mqueue_messages + ]. init_per_suite(Config) -> + ok = snabbkaffe:start_trace(), emqx_mgmt_api_test_util:init_suite(), Config. end_per_suite(_) -> + ok = snabbkaffe:stop(), emqx_mgmt_api_test_util:end_suite(). init_per_group(persistent_sessions, Config) -> @@ -67,6 +81,10 @@ init_per_group(persistent_sessions, Config) -> #{work_dir => emqx_cth_suite:work_dir(Config)} ), [{nodes, Nodes} | Config]; +init_per_group(msgs_base64_encoding, Config) -> + [{payload_encoding, base64} | Config]; +init_per_group(msgs_plain_encoding, Config) -> + [{payload_encoding, plain} | Config]; init_per_group(_Group, Config) -> Config. @@ -77,6 +95,21 @@ end_per_group(persistent_sessions, Config) -> end_per_group(_Group, _Config) -> ok. +end_per_testcase(TC, _Config) when + TC =:= t_inflight_messages; + TC =:= t_mqueue_messages +-> + ClientId = atom_to_binary(TC), + lists:foreach(fun(P) -> exit(P, kill) end, emqx_cm:lookup_channels(local, ClientId)), + ok = emqx_common_test_helpers:wait_for( + ?FUNCTION_NAME, + ?LINE, + fun() -> [] =:= emqx_cm:lookup_channels(local, ClientId) end, + 5000 + ); +end_per_testcase(_TC, _Config) -> + ok. + t_clients(_) -> process_flag(trap_exit, true), @@ -759,8 +792,206 @@ t_client_id_not_found(_Config) -> ?assertMatch({error, {Http, _, Body}}, PostFun(post, PathFun(["unsubscribe"]), UnsubBody)), ?assertMatch( {error, {Http, _, Body}}, PostFun(post, PathFun(["unsubscribe", "bulk"]), [UnsubBody]) + ), + %% Mqueue messages + ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["mqueue_messages"]))), + %% Inflight messages + ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["inflight_messages"]))). + +t_mqueue_messages(Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + Topic = <<"t/test_mqueue_msgs">>, + Count = emqx_mgmt:default_row_limit(), + {ok, _Client} = client_with_mqueue(ClientId, Topic, Count), + Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]), + ?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + test_messages(Path, Topic, Count, AuthHeader, ?config(payload_encoding, Config)), + + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api( + get, Path, "limit=10&after=not-base64%23%21", AuthHeader + ) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api( + get, Path, "limit=-5&after=not-base64%23%21", AuthHeader + ) ). +t_inflight_messages(Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + Topic = <<"t/test_inflight_msgs">>, + PubCount = emqx_mgmt:default_row_limit(), + {ok, Client} = client_with_inflight(ClientId, Topic, PubCount), + Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "inflight_messages"]), + InflightLimit = emqx:get_config([mqtt, max_inflight]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + test_messages(Path, Topic, InflightLimit, AuthHeader, ?config(payload_encoding, Config)), + + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api( + get, Path, "limit=10&after=not-int", AuthHeader + ) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api( + get, Path, "limit=-5&after=invalid-int", AuthHeader + ) + ), + emqtt:stop(Client). + +client_with_mqueue(ClientId, Topic, Count) -> + {ok, Client} = emqtt:start_link([ + {proto_ver, v5}, + {clientid, ClientId}, + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 120}} + ]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, Topic, 1), + ok = emqtt:disconnect(Client), + publish_msgs(Topic, Count), + {ok, Client}. + +client_with_inflight(ClientId, Topic, Count) -> + {ok, Client} = emqtt:start_link([ + {proto_ver, v5}, + {clientid, ClientId}, + {clean_start, true}, + {auto_ack, never} + ]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, Topic, 1), + publish_msgs(Topic, Count), + {ok, Client}. + +publish_msgs(Topic, Count) -> + lists:foreach( + fun(Seq) -> + emqx_broker:publish(emqx_message:make(undefined, ?QOS_1, Topic, integer_to_binary(Seq))) + end, + lists:seq(1, Count) + ). + +test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) -> + Qs0 = io_lib:format("payload=~s", [PayloadEncoding]), + {ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader), + #{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp), + + ?assertMatch( + #{ + <<"last">> := <<"end_of_data">>, + <<"count">> := Count + }, + Meta + ), + ?assertEqual(length(Msgs), Count), + lists:foreach( + fun({Seq, #{<<"payload">> := P} = M}) -> + ?assertEqual(Seq, binary_to_integer(decode_payload(P, PayloadEncoding))), + ?assertMatch( + #{ + <<"msgid">> := _, + <<"topic">> := Topic, + <<"qos">> := _, + <<"publish_at">> := _, + <<"from_clientid">> := _, + <<"from_username">> := _ + }, + M + ) + end, + lists:zip(lists:seq(1, Count), Msgs) + ), + + %% The first message payload is <<"1">>, + %% and when it is urlsafe base64 encoded (with no padding), it's <<"MQ">>, + %% so we cover both cases: + %% - when total payload size exceeds the limit, + %% - when the first message payload already exceeds the limit but is still returned in the response. + QsPayloadLimit = io_lib:format("payload=~s&max_payload_bytes=1", [PayloadEncoding]), + {ok, LimitedMsgsResp} = emqx_mgmt_api_test_util:request_api( + get, Path, QsPayloadLimit, AuthHeader + ), + #{<<"meta">> := _, <<"data">> := FirstMsgOnly} = emqx_utils_json:decode(LimitedMsgsResp), + ct:pal("~p", [FirstMsgOnly]), + ?assertEqual(1, length(FirstMsgOnly)), + ?assertEqual( + <<"1">>, decode_payload(maps:get(<<"payload">>, hd(FirstMsgOnly)), PayloadEncoding) + ), + + Limit = 19, + LastCont = lists:foldl( + fun(PageSeq, Cont) -> + Qs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, Cont, Limit]), + {ok, MsgsRespP} = emqx_mgmt_api_test_util:request_api(get, Path, Qs, AuthHeader), + #{ + <<"meta">> := #{<<"last">> := NextCont} = MetaP, + <<"data">> := MsgsP + } = emqx_utils_json:decode(MsgsRespP), + ?assertMatch(#{<<"count">> := Count}, MetaP), + ?assertNotEqual(<<"end_of_data">>, NextCont), + ?assertEqual(length(MsgsP), Limit), + ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1), + ExpLastPayload = integer_to_binary(PageSeq * Limit), + ?assertEqual( + ExpFirstPayload, decode_payload(maps:get(<<"payload">>, hd(MsgsP)), PayloadEncoding) + ), + ?assertEqual( + ExpLastPayload, + decode_payload(maps:get(<<"payload">>, lists:last(MsgsP)), PayloadEncoding) + ), + NextCont + end, + none, + lists:seq(1, Count div 19) + ), + LastPartialPage = Count div 19 + 1, + LastQs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, LastCont, Limit]), + {ok, MsgsRespLastP} = emqx_mgmt_api_test_util:request_api(get, Path, LastQs, AuthHeader), + #{<<"meta">> := #{<<"last">> := EmptyCont} = MetaLastP, <<"data">> := MsgsLastP} = emqx_utils_json:decode( + MsgsRespLastP + ), + ?assertEqual(<<"end_of_data">>, EmptyCont), + ?assertMatch(#{<<"count">> := Count}, MetaLastP), + + ?assertEqual( + integer_to_binary(LastPartialPage * Limit - Limit + 1), + decode_payload(maps:get(<<"payload">>, hd(MsgsLastP)), PayloadEncoding) + ), + ?assertEqual( + integer_to_binary(Count), + decode_payload(maps:get(<<"payload">>, lists:last(MsgsLastP)), PayloadEncoding) + ), + + ExceedQs = io_lib:format("payload=~s&after=~s&limit=~p", [ + PayloadEncoding, EmptyCont, Limit + ]), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(get, Path, ExceedQs, AuthHeader) + ), + + %% Invalid common page params + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(get, Path, "limit=0", AuthHeader) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(get, Path, "limit=limit", AuthHeader) + ). + +decode_payload(Payload, base64) -> + base64:decode(Payload); +decode_payload(Payload, _) -> + Payload. + t_subscribe_shared_topic(_Config) -> ClientId = <<"client_subscribe_shared">>, diff --git a/changes/ce/feat-12561.en.md b/changes/ce/feat-12561.en.md new file mode 100644 index 000000000..072a71373 --- /dev/null +++ b/changes/ce/feat-12561.en.md @@ -0,0 +1,21 @@ +Implement HTTP APIs to get the list of client's inflight and mqueue messages. + +To get the first chunk of data: + - GET /clients/{clientid}/mqueue_messages?limit=100 + - GET /clients/{clientid}/inflight_messages?limit=100 + +Alternatively: + - GET /clients/{clientid}/mqueue_messages?limit=100&after=none + - GET /clients/{clientid}/inflight_messages?limit=100&after=none + +To get the next chunk of data: + - GET /clients/{clientid}/mqueue_messages?limit=100&after={last} + - GET /clients/{clientid}/inflight_messages?limit=100&after={last} + + Where {last} is a value (opaque string token) of "meta.last" field from the previous response. + + If there is no more data, "last" = "end_of_data" is returned. + If a subsequent request is attempted with "after=end_of_data", a "400 Bad Request" error response will be received. + +Mqueue messages are ordered according to the queue (FIFO) order. +Inflight messages are ordered by MQTT Packet Id, which may not represent the chronological messages order. diff --git a/rel/i18n/emqx_mgmt_api_clients.hocon b/rel/i18n/emqx_mgmt_api_clients.hocon index 2431c09ec..3175715e0 100644 --- a/rel/i18n/emqx_mgmt_api_clients.hocon +++ b/rel/i18n/emqx_mgmt_api_clients.hocon @@ -35,6 +35,57 @@ get_client_subs.desc: get_client_subs.label: """Get client subscriptions""" +get_client_mqueue_msgs.desc: +"""Get client mqueue messages""" +get_client_mqueue_msgs.label: +"""Get client mqueue messages""" + +get_client_inflight_msgs.desc: +"""Get client inflight messages""" +get_client_inflight_msgs.label: +"""Get client inflight messages""" + +mqueue_msgs_list.desc: +"""Client's mqueue messages list. The queue (FIFO) ordering is preserved.""" +mqueue_msgs_list.label: +"""Client's mqueue messages""" + +inflight_msgs_list.desc: +"""Client's inflight messages list. +Ordered by MQTT Packet Id, which may not represent the chronological messages order.""" +inflight_msgs_list.label: +"""Client's inflight messages""" + +msg_id.desc: +"""Message ID.""" +msg_id.label: +"""Message ID""" + +msg_topic.desc: +"""Message topic.""" +msg_topic.label: +"""Message Topic""" + +msg_qos.desc: +"""Message QoS.""" +msg_topic.label: +"""Message Qos""" + +msg_publish_at.desc: +"""Message publish time, a millisecond precision Unix epoch timestamp.""" +msg_publish_at.label: +"""Message Publish Time.""" + +msg_from_clientid.desc: +"""Message publisher's client ID.""" +msg_from_clientid.desc: +"""Message publisher's Client ID""" + +msg_from_username.desc: +"""Message publisher's username.""" +msg_from_username.label: +"""Message Publisher's Username """ + subscribe.desc: """Subscribe""" subscribe.label: