Merge pull request #12272 from lafirest/feat/retain

feat(retain): add two new endpoints for retain API
This commit is contained in:
lafirest 2024-01-09 15:59:42 +08:00 committed by GitHub
commit cc00dd80ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 74 additions and 13 deletions

View File

@ -2,7 +2,7 @@
{application, emqx_retainer, [
{description, "EMQX Retainer"},
% strict semver, bump manually!
{vsn, "5.0.19"},
{vsn, "5.0.20"},
{modules, []},
{registered, [emqx_retainer_sup]},
{applications, [kernel, stdlib, emqx, emqx_ctl]},

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -25,7 +25,7 @@
-export([api_spec/0, paths/0, schema/1, namespace/0, fields/1]).
-export([
lookup_retained_warp/2,
'/messages'/2,
with_topic_warp/2,
config/2
]).
@ -69,11 +69,11 @@ schema(?PREFIX) ->
};
schema(?PREFIX ++ "/messages") ->
#{
'operationId' => lookup_retained_warp,
'operationId' => '/messages',
get => #{
tags => ?TAGS,
description => ?DESC(list_retained_api),
parameters => page_params(),
parameters => parameters(query, false, query_match_topic) ++ page_params(),
responses => #{
200 => [
{data, mk(array(ref(message_summary)), #{desc => ?DESC(retained_list)})},
@ -81,6 +81,13 @@ schema(?PREFIX ++ "/messages") ->
],
400 => error_codes(['BAD_REQUEST'], ?DESC(unsupported_backend))
}
},
delete => #{
tags => ?TAGS,
description => ?DESC(delete_messages),
responses => #{
204 => <<>>
}
}
};
schema(?PREFIX ++ "/message/:topic") ->
@ -118,12 +125,15 @@ conf_schema() ->
ref(emqx_retainer_schema, "retainer").
parameters() ->
parameters(path, true, topic).
parameters(In, Required, Desc) ->
[
{topic,
mk(binary(), #{
in => path,
required => true,
desc => ?DESC(topic)
in => In,
required => Required,
desc => ?DESC(Desc)
})}
].
@ -142,8 +152,10 @@ fields(message) ->
| fields(message_summary)
].
lookup_retained_warp(Type, Params) ->
check_backend(Type, Params, fun lookup_retained/2).
'/messages'(get, Params) ->
check_backend(get, Params, fun lookup_retained/2);
'/messages'(delete, Params) ->
delete_messages(delete, Params).
with_topic_warp(Type, Params) ->
check_backend(Type, Params, fun with_topic/2).
@ -168,7 +180,8 @@ config(put, #{body := Body}) ->
lookup_retained(get, #{query_string := Qs}) ->
Page = maps:get(<<"page">>, Qs, 1),
Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:default_row_limit()),
{ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, undefined, Page, Limit),
Topic = maps:get(<<"topic">>, Qs, undefined),
{ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit),
{200, #{
data => [format_message(Msg) || Msg <- Msgs],
meta => #{page => Page, limit => Limit, count => emqx_retainer_mnesia:size(?TAB_MESSAGE)}
@ -199,6 +212,10 @@ with_topic(delete, #{bindings := Bindings}) ->
{204}
end.
delete_messages(delete, _) ->
emqx_retainer:clean(),
{204}.
format_message(#message{
id = ID,
qos = Qos,

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -25,7 +25,9 @@
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-import(emqx_mgmt_api_test_util, [request_api/2, request_api/5, api_path/1, auth_header_/0]).
-import(emqx_mgmt_api_test_util, [
request_api/2, request_api/4, request_api/5, api_path/1, auth_header_/0
]).
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -308,6 +310,37 @@ t_change_storage_type(_Config) ->
ok.
t_match_and_clean(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqx_retainer:clean(),
timer:sleep(300),
_ = [
emqtt:publish(C1, <<P/binary, "/", S/binary>>, <<"retained">>, [{qos, 0}, {retain, true}])
|| P <- [<<"t">>, <<"f">>], S <- [<<"1">>, <<"2">>, <<"3">>]
],
timer:sleep(1000),
API = api_path(["mqtt", "retainer", "messages"]),
{ok, LookupJson} = request_api(get, API, "topic=t/%2B", auth_header_()),
LookupResult = decode_json(LookupJson),
Expected = lists:usort([<<"t/1">>, <<"t/2">>, <<"t/3">>]),
?assertMatch(
Expected,
lists:usort([Topic || #{topic := Topic} <- maps:get(data, LookupResult)])
),
CleanAPI = api_path(["mqtt", "retainer", "messages"]),
{ok, []} = request_api(delete, CleanAPI),
{ok, LookupJson2} = request_api(get, API),
?assertMatch(#{data := []}, decode_json(LookupJson2)),
ok = emqtt:disconnect(C1).
%%--------------------------------------------------------------------
%% HTTP Request
%%--------------------------------------------------------------------

View File

@ -0,0 +1,3 @@
Added a new endpoint `DELETE /retainer/messages` to `retain` API to clean all retained messages
Also added an optional topic filter in the query string for the endpoint "GET /retainer/messages", e.g. "topic=t/1".

View File

@ -27,6 +27,11 @@ list_retained_api.desc:
list_retained_api.label:
"""List retained messages."""
delete_messages.desc:
"""Delete all retained messages"""
delete_messages.label:
"""Delete Retained Messages"""
lookup_api.desc:
"""Lookup a message by a topic without wildcards."""
lookup_api.label:
@ -56,6 +61,9 @@ retained_list.desc:
topic.desc:
"""Topic."""
query_match_topic.desc:
"""Topic filter, supports wildcards, omit this to match all messages."""
unsupported_backend.desc:
"""Unsupported backend."""