fix(retain): add `hasnext` into the meta data for the `GET /retain/messages`
This commit is contained in:
parent
7b211b5a23
commit
f896cf2f4f
|
@ -78,7 +78,7 @@
|
||||||
-callback store_retained(context(), message()) -> ok.
|
-callback store_retained(context(), message()) -> ok.
|
||||||
-callback read_message(context(), topic()) -> {ok, list()}.
|
-callback read_message(context(), topic()) -> {ok, list()}.
|
||||||
-callback page_read(context(), topic(), non_neg_integer(), non_neg_integer()) ->
|
-callback page_read(context(), topic(), non_neg_integer(), non_neg_integer()) ->
|
||||||
{ok, list()}.
|
{ok, HasNext :: boolean(), list()}.
|
||||||
-callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}.
|
-callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}.
|
||||||
-callback clear_expired(context()) -> ok.
|
-callback clear_expired(context()) -> ok.
|
||||||
-callback clean(context()) -> ok.
|
-callback clean(context()) -> ok.
|
||||||
|
|
|
@ -181,15 +181,27 @@ lookup_retained(get, #{query_string := Qs}) ->
|
||||||
Page = maps:get(<<"page">>, Qs, 1),
|
Page = maps:get(<<"page">>, Qs, 1),
|
||||||
Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:default_row_limit()),
|
Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:default_row_limit()),
|
||||||
Topic = maps:get(<<"topic">>, Qs, undefined),
|
Topic = maps:get(<<"topic">>, Qs, undefined),
|
||||||
{ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit),
|
{ok, HasNext, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit),
|
||||||
|
Meta =
|
||||||
|
case Topic of
|
||||||
|
undefined ->
|
||||||
|
#{count => emqx_retainer_mnesia:size(?TAB_MESSAGE)};
|
||||||
|
_ ->
|
||||||
|
#{}
|
||||||
|
end,
|
||||||
{200, #{
|
{200, #{
|
||||||
data => [format_message(Msg) || Msg <- Msgs],
|
data => [format_message(Msg) || Msg <- Msgs],
|
||||||
meta => #{page => Page, limit => Limit, count => emqx_retainer_mnesia:size(?TAB_MESSAGE)}
|
meta =>
|
||||||
|
Meta#{
|
||||||
|
page => Page,
|
||||||
|
limit => Limit,
|
||||||
|
hasnext => HasNext
|
||||||
|
}
|
||||||
}}.
|
}}.
|
||||||
|
|
||||||
with_topic(get, #{bindings := Bindings}) ->
|
with_topic(get, #{bindings := Bindings}) ->
|
||||||
Topic = maps:get(topic, Bindings),
|
Topic = maps:get(topic, Bindings),
|
||||||
{ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1),
|
{ok, _, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1),
|
||||||
case Msgs of
|
case Msgs of
|
||||||
[H | _] ->
|
[H | _] ->
|
||||||
{200, format_detail_message(H)};
|
{200, format_detail_message(H)};
|
||||||
|
@ -202,12 +214,12 @@ with_topic(get, #{bindings := Bindings}) ->
|
||||||
with_topic(delete, #{bindings := Bindings}) ->
|
with_topic(delete, #{bindings := Bindings}) ->
|
||||||
Topic = maps:get(topic, Bindings),
|
Topic = maps:get(topic, Bindings),
|
||||||
case emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1) of
|
case emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1) of
|
||||||
{ok, []} ->
|
{ok, _, []} ->
|
||||||
{404, #{
|
{404, #{
|
||||||
code => <<"NOT_FOUND">>,
|
code => <<"NOT_FOUND">>,
|
||||||
message => <<"Viewed message doesn't exist">>
|
message => <<"Viewed message doesn't exist">>
|
||||||
}};
|
}};
|
||||||
{ok, _} ->
|
{ok, _, _} ->
|
||||||
emqx_retainer_mnesia:delete_message(undefined, Topic),
|
emqx_retainer_mnesia:delete_message(undefined, Topic),
|
||||||
{204}
|
{204}
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -231,20 +231,18 @@ page_read(_, Topic, Page, Limit) ->
|
||||||
false ->
|
false ->
|
||||||
more
|
more
|
||||||
end,
|
end,
|
||||||
PageRows =
|
|
||||||
case SkipResult of
|
case SkipResult of
|
||||||
closed ->
|
closed ->
|
||||||
[];
|
{ok, false, []};
|
||||||
more ->
|
more ->
|
||||||
case qlc_next_answers(Cursor, Limit) of
|
case qlc_next_answers(Cursor, Limit) of
|
||||||
{closed, Rows} ->
|
{closed, Rows} ->
|
||||||
Rows;
|
{ok, false, Rows};
|
||||||
{more, Rows} ->
|
{more, Rows} ->
|
||||||
qlc:delete_cursor(Cursor),
|
qlc:delete_cursor(Cursor),
|
||||||
Rows
|
{ok, true, Rows}
|
||||||
end
|
end
|
||||||
end,
|
end.
|
||||||
{ok, PageRows}.
|
|
||||||
|
|
||||||
clean(_) ->
|
clean(_) ->
|
||||||
_ = mria:clear_table(?TAB_MESSAGE),
|
_ = mria:clear_table(?TAB_MESSAGE),
|
||||||
|
|
|
@ -133,7 +133,7 @@ t_store_and_clean(_) ->
|
||||||
),
|
),
|
||||||
timer:sleep(100),
|
timer:sleep(100),
|
||||||
|
|
||||||
{ok, List} = emqx_retainer:page_read(<<"retained">>, 1, 10),
|
{ok, _, List} = emqx_retainer:page_read(<<"retained">>, 1, 10),
|
||||||
?assertEqual(1, length(List)),
|
?assertEqual(1, length(List)),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, [#message{payload = <<"this is a retained message">>}]},
|
{ok, [#message{payload = <<"this is a retained message">>}]},
|
||||||
|
@ -159,7 +159,7 @@ t_store_and_clean(_) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
ok = emqx_retainer:clean(),
|
ok = emqx_retainer:clean(),
|
||||||
{ok, List2} = emqx_retainer:page_read(<<"retained">>, 1, 10),
|
{ok, _, List2} = emqx_retainer:page_read(<<"retained">>, 1, 10),
|
||||||
?assertEqual(0, length(List2)),
|
?assertEqual(0, length(List2)),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, []},
|
{ok, []},
|
||||||
|
@ -486,12 +486,12 @@ t_clear_expired(_) ->
|
||||||
),
|
),
|
||||||
timer:sleep(1000),
|
timer:sleep(1000),
|
||||||
|
|
||||||
{ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
|
{ok, _, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
|
||||||
?assertEqual(5, erlang:length(List)),
|
?assertEqual(5, erlang:length(List)),
|
||||||
|
|
||||||
timer:sleep(4500),
|
timer:sleep(4500),
|
||||||
|
|
||||||
{ok, List2} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
|
{ok, _, List2} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
|
||||||
?assertEqual(0, erlang:length(List2)),
|
?assertEqual(0, erlang:length(List2)),
|
||||||
|
|
||||||
ok = emqtt:disconnect(C1)
|
ok = emqtt:disconnect(C1)
|
||||||
|
@ -522,7 +522,7 @@ t_max_payload_size(_) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
timer:sleep(500),
|
timer:sleep(500),
|
||||||
{ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
|
{ok, _, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
|
||||||
?assertEqual(1, erlang:length(List)),
|
?assertEqual(1, erlang:length(List)),
|
||||||
|
|
||||||
ok = emqtt:disconnect(C1)
|
ok = emqtt:disconnect(C1)
|
||||||
|
@ -546,10 +546,10 @@ t_page_read(_) ->
|
||||||
lists:foreach(Fun, lists:seq(1, 9)),
|
lists:foreach(Fun, lists:seq(1, 9)),
|
||||||
timer:sleep(200),
|
timer:sleep(200),
|
||||||
|
|
||||||
{ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 5),
|
{ok, _, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 5),
|
||||||
?assertEqual(5, length(List)),
|
?assertEqual(5, length(List)),
|
||||||
|
|
||||||
{ok, List2} = emqx_retainer:page_read(<<"retained/+">>, 2, 5),
|
{ok, _, List2} = emqx_retainer:page_read(<<"retained/+">>, 2, 5),
|
||||||
?assertEqual(4, length(List2)),
|
?assertEqual(4, length(List2)),
|
||||||
|
|
||||||
ok = emqtt:disconnect(C1).
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
Loading…
Reference in New Issue