From 6895ce997873a0be5885230d29795bf5fc49d402 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 5 May 2022 15:57:37 +0800 Subject: [PATCH 1/2] fix(retainer): retainer message page read --- apps/emqx_retainer/src/emqx_retainer_api.erl | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/apps/emqx_retainer/src/emqx_retainer_api.erl b/apps/emqx_retainer/src/emqx_retainer_api.erl index 73436fa1e..2c0bd725c 100644 --- a/apps/emqx_retainer/src/emqx_retainer_api.erl +++ b/apps/emqx_retainer/src/emqx_retainer_api.erl @@ -18,8 +18,7 @@ -behaviour(minirest_api). --include_lib("emqx/include/emqx.hrl"). --include_lib("typerefl/include/types.hrl"). +-include("emqx_retainer.hrl"). -include_lib("hocon/include/hoconsc.hrl"). %% API @@ -31,7 +30,7 @@ config/2 ]). --import(hoconsc, [mk/2, ref/1, ref/2, array/1]). +-import(hoconsc, [mk/1, mk/2, ref/1, ref/2, array/1]). -import(emqx_dashboard_swagger, [error_codes/2]). %% 1MB = 1024 x 1024 @@ -76,7 +75,10 @@ schema(?PREFIX ++ "/messages") -> description => ?DESC(list_retained_api), parameters => page_params(), responses => #{ - 200 => mk(array(ref(message_summary)), #{desc => ?DESC(retained_list)}), + 200 => [ + {data, mk(array(ref(message_summary)), #{desc => ?DESC(retained_list)})}, + {meta, mk(hoconsc:ref(emqx_dashboard_swagger, meta))} + ], 400 => error_codes(['BAD_REQUEST'], ?DESC(unsupported_backend)) } } @@ -163,10 +165,13 @@ config(put, #{body := Body}) -> %% Interval Funcs %%------------------------------------------------------------------------------ lookup_retained(get, #{query_string := Qs}) -> - Page = maps:get(page, Qs, 1), - Limit = maps:get(page, Qs, emqx_mgmt:max_row_limit()), + Page = maps:get(<<"page">>, Qs, 1), + Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:max_row_limit()), {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, undefined, Page, Limit), - {200, [format_message(Msg) || Msg <- Msgs]}. + {200, #{ + data => [format_message(Msg) || Msg <- Msgs], + meta => #{page => Page, limit => Limit, count => emqx_retainer_mnesia:size(?TAB_MESSAGE)} + }}. with_topic(get, #{bindings := Bindings}) -> Topic = maps:get(topic, Bindings), From 041ea4ba5c4d598dfe2f76fd8cd4142099868c1e Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 5 May 2022 15:58:03 +0800 Subject: [PATCH 2/2] test(retainer): retainer api page read --- .../test/emqx_retainer_api_SUITE.erl | 80 +++++++++++++++++-- 1 file changed, 75 insertions(+), 5 deletions(-) diff --git a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl index 9b80a1cc3..6b5c2d10e 100644 --- a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl @@ -35,14 +35,16 @@ init_per_suite(Config) -> ok = ekka:start(), ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), emqx_retainer_SUITE:load_base_conf(), - emqx_mgmt_api_test_util:init_suite([emqx_retainer]), + emqx_mgmt_api_test_util:init_suite([emqx_retainer, emqx_conf]), + %% make sure no "$SYS/#" topics + emqx_conf:update([sys_topics], raw_systopic_conf(), #{override_to => cluster}), Config. end_per_suite(Config) -> ekka:stop(), mria:stop(), mria_mnesia:delete_schema(), - emqx_mgmt_api_test_util:end_suite([emqx_retainer]), + emqx_mgmt_api_test_util:end_suite([emqx_retainer, emqx_conf]), Config. init_per_testcase(_, Config) -> @@ -110,11 +112,10 @@ t_messages(_) -> ), {ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])), - Msgs = decode_json(MsgsJson), + #{data := Msgs, meta := _} = decode_json(MsgsJson), MsgLen = erlang:length(Msgs), ?assert( - MsgLen >= 5, - %% maybe has $SYS messages + MsgLen =:= 5, io_lib:format("message length is:~p~n", [MsgLen]) ), @@ -133,6 +134,59 @@ t_messages(_) -> ok = emqtt:disconnect(C1). +t_messages_page(_) -> + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + emqx_retainer:clean(), + + Each = fun(I) -> + emqtt:publish( + C1, + <<"retained/", (I + 60)>>, + <<"retained">>, + [{qos, 0}, {retain, true}] + ) + end, + + ?check_trace( + ?wait_async_action( + lists:foreach(Each, lists:seq(1, 5)), + #{?snk_kind := message_retained, topic := <<"retained/A">>}, + 500 + ), + [] + ), + Page = 4, + + {ok, MsgsJson} = request_api( + get, + api_path([ + "mqtt", "retainer", "messages?page=" ++ erlang:integer_to_list(Page) ++ "&limit=1" + ]) + ), + #{data := Msgs, meta := #{page := Page, limit := 1}} = decode_json(MsgsJson), + MsgLen = erlang:length(Msgs), + ?assert( + MsgLen =:= 1, + io_lib:format("message length is:~p~n", [MsgLen]) + ), + + [OnlyOne] = Msgs, + Topic = <<"retained/", (Page + 60)>>, + ?assertMatch( + #{ + msgid := _, + topic := Topic, + qos := _, + publish_at := _, + from_clientid := _, + from_username := _ + }, + OnlyOne + ), + + ok = emqtt:disconnect(C1). + t_lookup_and_delete(_) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), @@ -171,3 +225,19 @@ t_lookup_and_delete(_) -> decode_json(Data) -> BinJson = emqx_json:decode(Data, [return_maps]), emqx_map_lib:unsafe_atom_key_map(BinJson). + +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- +raw_systopic_conf() -> + #{ + <<"sys_event_messages">> => + #{ + <<"client_connected">> => false, + <<"client_disconnected">> => false, + <<"client_subscribed">> => false, + <<"client_unsubscribed">> => false + }, + <<"sys_heartbeat_interval">> => <<"1440m">>, + <<"sys_msg_interval">> => <<"1440m">> + }.