diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 73b7fbf90..fbf5e8e24 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -78,7 +78,7 @@ -callback store_retained(context(), message()) -> ok. -callback read_message(context(), topic()) -> {ok, list()}. -callback page_read(context(), topic(), non_neg_integer(), non_neg_integer()) -> - {ok, list()}. + {ok, HasNext :: boolean(), list()}. -callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}. -callback clear_expired(context()) -> ok. -callback clean(context()) -> ok. diff --git a/apps/emqx_retainer/src/emqx_retainer_api.erl b/apps/emqx_retainer/src/emqx_retainer_api.erl index 5aa1b7a31..bb232f9e4 100644 --- a/apps/emqx_retainer/src/emqx_retainer_api.erl +++ b/apps/emqx_retainer/src/emqx_retainer_api.erl @@ -181,15 +181,27 @@ lookup_retained(get, #{query_string := Qs}) -> Page = maps:get(<<"page">>, Qs, 1), Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:default_row_limit()), Topic = maps:get(<<"topic">>, Qs, undefined), - {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit), + {ok, HasNext, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit), + Meta = + case Topic of + undefined -> + #{count => emqx_retainer_mnesia:size(?TAB_MESSAGE)}; + _ -> + #{} + end, {200, #{ data => [format_message(Msg) || Msg <- Msgs], - meta => #{page => Page, limit => Limit, count => emqx_retainer_mnesia:size(?TAB_MESSAGE)} + meta => + Meta#{ + page => Page, + limit => Limit, + hasnext => HasNext + } }}. with_topic(get, #{bindings := Bindings}) -> Topic = maps:get(topic, Bindings), - {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1), + {ok, _, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1), case Msgs of [H | _] -> {200, format_detail_message(H)}; @@ -202,12 +214,12 @@ with_topic(get, #{bindings := Bindings}) -> with_topic(delete, #{bindings := Bindings}) -> Topic = maps:get(topic, Bindings), case emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1) of - {ok, []} -> + {ok, _, []} -> {404, #{ code => <<"NOT_FOUND">>, message => <<"Viewed message doesn't exist">> }}; - {ok, _} -> + {ok, _, _} -> emqx_retainer_mnesia:delete_message(undefined, Topic), {204} end. diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 73c86fe04..831969904 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -231,20 +231,18 @@ page_read(_, Topic, Page, Limit) -> false -> more end, - PageRows = - case SkipResult of - closed -> - []; - more -> - case qlc_next_answers(Cursor, Limit) of - {closed, Rows} -> - Rows; - {more, Rows} -> - qlc:delete_cursor(Cursor), - Rows - end - end, - {ok, PageRows}. + case SkipResult of + closed -> + {ok, false, []}; + more -> + case qlc_next_answers(Cursor, Limit) of + {closed, Rows} -> + {ok, false, Rows}; + {more, Rows} -> + qlc:delete_cursor(Cursor), + {ok, true, Rows} + end + end. clean(_) -> _ = mria:clear_table(?TAB_MESSAGE), diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 595f37fff..2818f6bfa 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -133,7 +133,7 @@ t_store_and_clean(_) -> ), timer:sleep(100), - {ok, List} = emqx_retainer:page_read(<<"retained">>, 1, 10), + {ok, _, List} = emqx_retainer:page_read(<<"retained">>, 1, 10), ?assertEqual(1, length(List)), ?assertMatch( {ok, [#message{payload = <<"this is a retained message">>}]}, @@ -159,7 +159,7 @@ t_store_and_clean(_) -> ), ok = emqx_retainer:clean(), - {ok, List2} = emqx_retainer:page_read(<<"retained">>, 1, 10), + {ok, _, List2} = emqx_retainer:page_read(<<"retained">>, 1, 10), ?assertEqual(0, length(List2)), ?assertMatch( {ok, []}, @@ -486,12 +486,12 @@ t_clear_expired(_) -> ), timer:sleep(1000), - {ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), + {ok, _, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), ?assertEqual(5, erlang:length(List)), timer:sleep(4500), - {ok, List2} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), + {ok, _, List2} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), ?assertEqual(0, erlang:length(List2)), ok = emqtt:disconnect(C1) @@ -522,7 +522,7 @@ t_max_payload_size(_) -> ), timer:sleep(500), - {ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), + {ok, _, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), ?assertEqual(1, erlang:length(List)), ok = emqtt:disconnect(C1) @@ -546,10 +546,10 @@ t_page_read(_) -> lists:foreach(Fun, lists:seq(1, 9)), timer:sleep(200), - {ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 5), + {ok, _, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 5), ?assertEqual(5, length(List)), - {ok, List2} = emqx_retainer:page_read(<<"retained/+">>, 2, 5), + {ok, _, List2} = emqx_retainer:page_read(<<"retained/+">>, 2, 5), ?assertEqual(4, length(List2)), ok = emqtt:disconnect(C1).