Merge pull request #7866 from JimMoen/fix-retainer-api-page-limit
Fix retainer api page limit
This commit is contained in:
commit
596005ca5f
|
@ -18,8 +18,7 @@
|
||||||
|
|
||||||
-behaviour(minirest_api).
|
-behaviour(minirest_api).
|
||||||
|
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include("emqx_retainer.hrl").
|
||||||
-include_lib("typerefl/include/types.hrl").
|
|
||||||
-include_lib("hocon/include/hoconsc.hrl").
|
-include_lib("hocon/include/hoconsc.hrl").
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
|
@ -31,7 +30,7 @@
|
||||||
config/2
|
config/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-import(hoconsc, [mk/2, ref/1, ref/2, array/1]).
|
-import(hoconsc, [mk/1, mk/2, ref/1, ref/2, array/1]).
|
||||||
-import(emqx_dashboard_swagger, [error_codes/2]).
|
-import(emqx_dashboard_swagger, [error_codes/2]).
|
||||||
|
|
||||||
%% 1MB = 1024 x 1024
|
%% 1MB = 1024 x 1024
|
||||||
|
@ -76,7 +75,10 @@ schema(?PREFIX ++ "/messages") ->
|
||||||
description => ?DESC(list_retained_api),
|
description => ?DESC(list_retained_api),
|
||||||
parameters => page_params(),
|
parameters => page_params(),
|
||||||
responses => #{
|
responses => #{
|
||||||
200 => mk(array(ref(message_summary)), #{desc => ?DESC(retained_list)}),
|
200 => [
|
||||||
|
{data, mk(array(ref(message_summary)), #{desc => ?DESC(retained_list)})},
|
||||||
|
{meta, mk(hoconsc:ref(emqx_dashboard_swagger, meta))}
|
||||||
|
],
|
||||||
400 => error_codes(['BAD_REQUEST'], ?DESC(unsupported_backend))
|
400 => error_codes(['BAD_REQUEST'], ?DESC(unsupported_backend))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -163,10 +165,13 @@ config(put, #{body := Body}) ->
|
||||||
%% Interval Funcs
|
%% Interval Funcs
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
lookup_retained(get, #{query_string := Qs}) ->
|
lookup_retained(get, #{query_string := Qs}) ->
|
||||||
Page = maps:get(page, Qs, 1),
|
Page = maps:get(<<"page">>, Qs, 1),
|
||||||
Limit = maps:get(page, Qs, emqx_mgmt:max_row_limit()),
|
Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:max_row_limit()),
|
||||||
{ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, undefined, Page, Limit),
|
{ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, undefined, Page, Limit),
|
||||||
{200, [format_message(Msg) || Msg <- Msgs]}.
|
{200, #{
|
||||||
|
data => [format_message(Msg) || Msg <- Msgs],
|
||||||
|
meta => #{page => Page, limit => Limit, count => emqx_retainer_mnesia:size(?TAB_MESSAGE)}
|
||||||
|
}}.
|
||||||
|
|
||||||
with_topic(get, #{bindings := Bindings}) ->
|
with_topic(get, #{bindings := Bindings}) ->
|
||||||
Topic = maps:get(topic, Bindings),
|
Topic = maps:get(topic, Bindings),
|
||||||
|
|
|
@ -35,14 +35,16 @@ init_per_suite(Config) ->
|
||||||
ok = ekka:start(),
|
ok = ekka:start(),
|
||||||
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
|
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
|
||||||
emqx_retainer_SUITE:load_base_conf(),
|
emqx_retainer_SUITE:load_base_conf(),
|
||||||
emqx_mgmt_api_test_util:init_suite([emqx_retainer]),
|
emqx_mgmt_api_test_util:init_suite([emqx_retainer, emqx_conf]),
|
||||||
|
%% make sure no "$SYS/#" topics
|
||||||
|
emqx_conf:update([sys_topics], raw_systopic_conf(), #{override_to => cluster}),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(Config) ->
|
end_per_suite(Config) ->
|
||||||
ekka:stop(),
|
ekka:stop(),
|
||||||
mria:stop(),
|
mria:stop(),
|
||||||
mria_mnesia:delete_schema(),
|
mria_mnesia:delete_schema(),
|
||||||
emqx_mgmt_api_test_util:end_suite([emqx_retainer]),
|
emqx_mgmt_api_test_util:end_suite([emqx_retainer, emqx_conf]),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
init_per_testcase(_, Config) ->
|
init_per_testcase(_, Config) ->
|
||||||
|
@ -110,11 +112,10 @@ t_messages(_) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
{ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
|
{ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
|
||||||
Msgs = decode_json(MsgsJson),
|
#{data := Msgs, meta := _} = decode_json(MsgsJson),
|
||||||
MsgLen = erlang:length(Msgs),
|
MsgLen = erlang:length(Msgs),
|
||||||
?assert(
|
?assert(
|
||||||
MsgLen >= 5,
|
MsgLen =:= 5,
|
||||||
%% maybe has $SYS messages
|
|
||||||
io_lib:format("message length is:~p~n", [MsgLen])
|
io_lib:format("message length is:~p~n", [MsgLen])
|
||||||
),
|
),
|
||||||
|
|
||||||
|
@ -133,6 +134,59 @@ t_messages(_) ->
|
||||||
|
|
||||||
ok = emqtt:disconnect(C1).
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
|
t_messages_page(_) ->
|
||||||
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
emqx_retainer:clean(),
|
||||||
|
|
||||||
|
Each = fun(I) ->
|
||||||
|
emqtt:publish(
|
||||||
|
C1,
|
||||||
|
<<"retained/", (I + 60)>>,
|
||||||
|
<<"retained">>,
|
||||||
|
[{qos, 0}, {retain, true}]
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
|
||||||
|
?check_trace(
|
||||||
|
?wait_async_action(
|
||||||
|
lists:foreach(Each, lists:seq(1, 5)),
|
||||||
|
#{?snk_kind := message_retained, topic := <<"retained/A">>},
|
||||||
|
500
|
||||||
|
),
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
Page = 4,
|
||||||
|
|
||||||
|
{ok, MsgsJson} = request_api(
|
||||||
|
get,
|
||||||
|
api_path([
|
||||||
|
"mqtt", "retainer", "messages?page=" ++ erlang:integer_to_list(Page) ++ "&limit=1"
|
||||||
|
])
|
||||||
|
),
|
||||||
|
#{data := Msgs, meta := #{page := Page, limit := 1}} = decode_json(MsgsJson),
|
||||||
|
MsgLen = erlang:length(Msgs),
|
||||||
|
?assert(
|
||||||
|
MsgLen =:= 1,
|
||||||
|
io_lib:format("message length is:~p~n", [MsgLen])
|
||||||
|
),
|
||||||
|
|
||||||
|
[OnlyOne] = Msgs,
|
||||||
|
Topic = <<"retained/", (Page + 60)>>,
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
msgid := _,
|
||||||
|
topic := Topic,
|
||||||
|
qos := _,
|
||||||
|
publish_at := _,
|
||||||
|
from_clientid := _,
|
||||||
|
from_username := _
|
||||||
|
},
|
||||||
|
OnlyOne
|
||||||
|
),
|
||||||
|
|
||||||
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
t_lookup_and_delete(_) ->
|
t_lookup_and_delete(_) ->
|
||||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(C1),
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
@ -171,3 +225,19 @@ t_lookup_and_delete(_) ->
|
||||||
decode_json(Data) ->
|
decode_json(Data) ->
|
||||||
BinJson = emqx_json:decode(Data, [return_maps]),
|
BinJson = emqx_json:decode(Data, [return_maps]),
|
||||||
emqx_map_lib:unsafe_atom_key_map(BinJson).
|
emqx_map_lib:unsafe_atom_key_map(BinJson).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal funcs
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
raw_systopic_conf() ->
|
||||||
|
#{
|
||||||
|
<<"sys_event_messages">> =>
|
||||||
|
#{
|
||||||
|
<<"client_connected">> => false,
|
||||||
|
<<"client_disconnected">> => false,
|
||||||
|
<<"client_subscribed">> => false,
|
||||||
|
<<"client_unsubscribed">> => false
|
||||||
|
},
|
||||||
|
<<"sys_heartbeat_interval">> => <<"1440m">>,
|
||||||
|
<<"sys_msg_interval">> => <<"1440m">>
|
||||||
|
}.
|
||||||
|
|
Loading…
Reference in New Issue