Merge pull request #12290 from lafirest/fix/retain_match

fix(retain): add `hasnext` into the meta data for the `GET /retain/messages`
This commit is contained in:
lafirest 2024-01-11 18:09:29 +08:00 committed by GitHub
commit f042462d53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 37 additions and 27 deletions

View File

@ -78,7 +78,7 @@
-callback store_retained(context(), message()) -> ok. -callback store_retained(context(), message()) -> ok.
-callback read_message(context(), topic()) -> {ok, list()}. -callback read_message(context(), topic()) -> {ok, list()}.
-callback page_read(context(), topic(), non_neg_integer(), non_neg_integer()) -> -callback page_read(context(), topic(), non_neg_integer(), non_neg_integer()) ->
{ok, list()}. {ok, HasNext :: boolean(), list()}.
-callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}. -callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}.
-callback clear_expired(context()) -> ok. -callback clear_expired(context()) -> ok.
-callback clean(context()) -> ok. -callback clean(context()) -> ok.

View File

@ -181,15 +181,27 @@ lookup_retained(get, #{query_string := Qs}) ->
Page = maps:get(<<"page">>, Qs, 1), Page = maps:get(<<"page">>, Qs, 1),
Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:default_row_limit()), Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:default_row_limit()),
Topic = maps:get(<<"topic">>, Qs, undefined), Topic = maps:get(<<"topic">>, Qs, undefined),
{ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit), {ok, HasNext, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit),
Meta =
case Topic of
undefined ->
#{count => emqx_retainer_mnesia:size(?TAB_MESSAGE)};
_ ->
#{}
end,
{200, #{ {200, #{
data => [format_message(Msg) || Msg <- Msgs], data => [format_message(Msg) || Msg <- Msgs],
meta => #{page => Page, limit => Limit, count => emqx_retainer_mnesia:size(?TAB_MESSAGE)} meta =>
Meta#{
page => Page,
limit => Limit,
hasnext => HasNext
}
}}. }}.
with_topic(get, #{bindings := Bindings}) -> with_topic(get, #{bindings := Bindings}) ->
Topic = maps:get(topic, Bindings), Topic = maps:get(topic, Bindings),
{ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1), {ok, _, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1),
case Msgs of case Msgs of
[H | _] -> [H | _] ->
{200, format_detail_message(H)}; {200, format_detail_message(H)};
@ -202,12 +214,12 @@ with_topic(get, #{bindings := Bindings}) ->
with_topic(delete, #{bindings := Bindings}) -> with_topic(delete, #{bindings := Bindings}) ->
Topic = maps:get(topic, Bindings), Topic = maps:get(topic, Bindings),
case emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1) of case emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1) of
{ok, []} -> {ok, _, []} ->
{404, #{ {404, #{
code => <<"NOT_FOUND">>, code => <<"NOT_FOUND">>,
message => <<"Viewed message doesn't exist">> message => <<"Viewed message doesn't exist">>
}}; }};
{ok, _} -> {ok, _, _} ->
emqx_retainer_mnesia:delete_message(undefined, Topic), emqx_retainer_mnesia:delete_message(undefined, Topic),
{204} {204}
end. end.

View File

@ -231,20 +231,18 @@ page_read(_, Topic, Page, Limit) ->
false -> false ->
more more
end, end,
PageRows = case SkipResult of
case SkipResult of closed ->
closed -> {ok, false, []};
[]; more ->
more -> case qlc_next_answers(Cursor, Limit) of
case qlc_next_answers(Cursor, Limit) of {closed, Rows} ->
{closed, Rows} -> {ok, false, Rows};
Rows; {more, Rows} ->
{more, Rows} -> qlc:delete_cursor(Cursor),
qlc:delete_cursor(Cursor), {ok, true, Rows}
Rows end
end end.
end,
{ok, PageRows}.
clean(_) -> clean(_) ->
_ = mria:clear_table(?TAB_MESSAGE), _ = mria:clear_table(?TAB_MESSAGE),

View File

@ -133,7 +133,7 @@ t_store_and_clean(_) ->
), ),
timer:sleep(100), timer:sleep(100),
{ok, List} = emqx_retainer:page_read(<<"retained">>, 1, 10), {ok, _, List} = emqx_retainer:page_read(<<"retained">>, 1, 10),
?assertEqual(1, length(List)), ?assertEqual(1, length(List)),
?assertMatch( ?assertMatch(
{ok, [#message{payload = <<"this is a retained message">>}]}, {ok, [#message{payload = <<"this is a retained message">>}]},
@ -159,7 +159,7 @@ t_store_and_clean(_) ->
), ),
ok = emqx_retainer:clean(), ok = emqx_retainer:clean(),
{ok, List2} = emqx_retainer:page_read(<<"retained">>, 1, 10), {ok, _, List2} = emqx_retainer:page_read(<<"retained">>, 1, 10),
?assertEqual(0, length(List2)), ?assertEqual(0, length(List2)),
?assertMatch( ?assertMatch(
{ok, []}, {ok, []},
@ -486,12 +486,12 @@ t_clear_expired(_) ->
), ),
timer:sleep(1000), timer:sleep(1000),
{ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), {ok, _, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
?assertEqual(5, erlang:length(List)), ?assertEqual(5, erlang:length(List)),
timer:sleep(4500), timer:sleep(4500),
{ok, List2} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), {ok, _, List2} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
?assertEqual(0, erlang:length(List2)), ?assertEqual(0, erlang:length(List2)),
ok = emqtt:disconnect(C1) ok = emqtt:disconnect(C1)
@ -522,7 +522,7 @@ t_max_payload_size(_) ->
), ),
timer:sleep(500), timer:sleep(500),
{ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), {ok, _, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
?assertEqual(1, erlang:length(List)), ?assertEqual(1, erlang:length(List)),
ok = emqtt:disconnect(C1) ok = emqtt:disconnect(C1)
@ -546,10 +546,10 @@ t_page_read(_) ->
lists:foreach(Fun, lists:seq(1, 9)), lists:foreach(Fun, lists:seq(1, 9)),
timer:sleep(200), timer:sleep(200),
{ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 5), {ok, _, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 5),
?assertEqual(5, length(List)), ?assertEqual(5, length(List)),
{ok, List2} = emqx_retainer:page_read(<<"retained/+">>, 2, 5), {ok, _, List2} = emqx_retainer:page_read(<<"retained/+">>, 2, 5),
?assertEqual(4, length(List2)), ?assertEqual(4, length(List2)),
ok = emqtt:disconnect(C1). ok = emqtt:disconnect(C1).