diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index 6c0def7ae..5d6fe95f7 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -2,7 +2,7 @@ {application, emqx_retainer, [ {description, "EMQX Retainer"}, % strict semver, bump manually! - {vsn, "5.0.19"}, + {vsn, "5.0.20"}, {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel, stdlib, emqx, emqx_ctl]}, diff --git a/apps/emqx_retainer/src/emqx_retainer_api.erl b/apps/emqx_retainer/src/emqx_retainer_api.erl index 446679325..5aa1b7a31 100644 --- a/apps/emqx_retainer/src/emqx_retainer_api.erl +++ b/apps/emqx_retainer/src/emqx_retainer_api.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ -export([api_spec/0, paths/0, schema/1, namespace/0, fields/1]). -export([ - lookup_retained_warp/2, + '/messages'/2, with_topic_warp/2, config/2 ]). @@ -69,11 +69,11 @@ schema(?PREFIX) -> }; schema(?PREFIX ++ "/messages") -> #{ - 'operationId' => lookup_retained_warp, + 'operationId' => '/messages', get => #{ tags => ?TAGS, description => ?DESC(list_retained_api), - parameters => page_params(), + parameters => parameters(query, false, query_match_topic) ++ page_params(), responses => #{ 200 => [ {data, mk(array(ref(message_summary)), #{desc => ?DESC(retained_list)})}, @@ -81,6 +81,13 @@ schema(?PREFIX ++ "/messages") -> ], 400 => error_codes(['BAD_REQUEST'], ?DESC(unsupported_backend)) } + }, + delete => #{ + tags => ?TAGS, + description => ?DESC(delete_messages), + responses => #{ + 204 => <<>> + } } }; schema(?PREFIX ++ "/message/:topic") -> @@ -118,12 +125,15 @@ conf_schema() -> ref(emqx_retainer_schema, "retainer"). parameters() -> + parameters(path, true, topic). + +parameters(In, Required, Desc) -> [ {topic, mk(binary(), #{ - in => path, - required => true, - desc => ?DESC(topic) + in => In, + required => Required, + desc => ?DESC(Desc) })} ]. @@ -142,8 +152,10 @@ fields(message) -> | fields(message_summary) ]. -lookup_retained_warp(Type, Params) -> - check_backend(Type, Params, fun lookup_retained/2). +'/messages'(get, Params) -> + check_backend(get, Params, fun lookup_retained/2); +'/messages'(delete, Params) -> + delete_messages(delete, Params). with_topic_warp(Type, Params) -> check_backend(Type, Params, fun with_topic/2). @@ -168,7 +180,8 @@ config(put, #{body := Body}) -> lookup_retained(get, #{query_string := Qs}) -> Page = maps:get(<<"page">>, Qs, 1), Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:default_row_limit()), - {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, undefined, Page, Limit), + Topic = maps:get(<<"topic">>, Qs, undefined), + {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit), {200, #{ data => [format_message(Msg) || Msg <- Msgs], meta => #{page => Page, limit => Limit, count => emqx_retainer_mnesia:size(?TAB_MESSAGE)} @@ -199,6 +212,10 @@ with_topic(delete, #{bindings := Bindings}) -> {204} end. +delete_messages(delete, _) -> + emqx_retainer:clean(), + {204}. + format_message(#message{ id = ID, qos = Qos, diff --git a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl index d00ade556..fb4dea16d 100644 --- a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -25,7 +25,9 @@ -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). --import(emqx_mgmt_api_test_util, [request_api/2, request_api/5, api_path/1, auth_header_/0]). +-import(emqx_mgmt_api_test_util, [ + request_api/2, request_api/4, request_api/5, api_path/1, auth_header_/0 +]). all() -> emqx_common_test_helpers:all(?MODULE). @@ -308,6 +310,37 @@ t_change_storage_type(_Config) -> ok. +t_match_and_clean(_) -> + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + emqx_retainer:clean(), + timer:sleep(300), + + _ = [ + emqtt:publish(C1, <

>, <<"retained">>, [{qos, 0}, {retain, true}]) + || P <- [<<"t">>, <<"f">>], S <- [<<"1">>, <<"2">>, <<"3">>] + ], + + timer:sleep(1000), + + API = api_path(["mqtt", "retainer", "messages"]), + {ok, LookupJson} = request_api(get, API, "topic=t/%2B", auth_header_()), + LookupResult = decode_json(LookupJson), + + Expected = lists:usort([<<"t/1">>, <<"t/2">>, <<"t/3">>]), + ?assertMatch( + Expected, + lists:usort([Topic || #{topic := Topic} <- maps:get(data, LookupResult)]) + ), + + CleanAPI = api_path(["mqtt", "retainer", "messages"]), + {ok, []} = request_api(delete, CleanAPI), + + {ok, LookupJson2} = request_api(get, API), + ?assertMatch(#{data := []}, decode_json(LookupJson2)), + + ok = emqtt:disconnect(C1). + %%-------------------------------------------------------------------- %% HTTP Request %%-------------------------------------------------------------------- diff --git a/changes/ce/feat-12272.en.md b/changes/ce/feat-12272.en.md new file mode 100644 index 000000000..b86bdf57d --- /dev/null +++ b/changes/ce/feat-12272.en.md @@ -0,0 +1,3 @@ +Added a new endpoint `DELETE /retainer/messages` to `retain` API to clean all retained messages + +Also added an optional topic filter in the query string for the endpoint "GET /retainer/messages", e.g. "topic=t/1". diff --git a/rel/i18n/emqx_retainer_api.hocon b/rel/i18n/emqx_retainer_api.hocon index cdb49f8d0..87ded1165 100644 --- a/rel/i18n/emqx_retainer_api.hocon +++ b/rel/i18n/emqx_retainer_api.hocon @@ -27,6 +27,11 @@ list_retained_api.desc: list_retained_api.label: """List retained messages.""" +delete_messages.desc: +"""Delete all retained messages""" +delete_messages.label: +"""Delete Retained Messages""" + lookup_api.desc: """Lookup a message by a topic without wildcards.""" lookup_api.label: @@ -56,6 +61,9 @@ retained_list.desc: topic.desc: """Topic.""" +query_match_topic.desc: +"""Topic filter, supports wildcards, omit this to match all messages.""" + unsupported_backend.desc: """Unsupported backend."""