diff --git a/apps/emqx_retainer/src/emqx_retainer_api.erl b/apps/emqx_retainer/src/emqx_retainer_api.erl index 446679325..5c81bd52e 100644 --- a/apps/emqx_retainer/src/emqx_retainer_api.erl +++ b/apps/emqx_retainer/src/emqx_retainer_api.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ -export([api_spec/0, paths/0, schema/1, namespace/0, fields/1]). -export([ - lookup_retained_warp/2, + '/messages'/2, with_topic_warp/2, config/2 ]). @@ -69,11 +69,11 @@ schema(?PREFIX) -> }; schema(?PREFIX ++ "/messages") -> #{ - 'operationId' => lookup_retained_warp, + 'operationId' => '/messages', get => #{ tags => ?TAGS, description => ?DESC(list_retained_api), - parameters => page_params(), + parameters => parameters(query, false, query_match_topic) ++ page_params(), responses => #{ 200 => [ {data, mk(array(ref(message_summary)), #{desc => ?DESC(retained_list)})}, @@ -81,6 +81,13 @@ schema(?PREFIX ++ "/messages") -> ], 400 => error_codes(['BAD_REQUEST'], ?DESC(unsupported_backend)) } + }, + delete => #{ + tags => ?TAGS, + description => ?DESC(delete_messages), + responses => #{ + 204 => <<>> + } } }; schema(?PREFIX ++ "/message/:topic") -> @@ -118,12 +125,15 @@ conf_schema() -> ref(emqx_retainer_schema, "retainer"). parameters() -> + parameters(path, true, topic). + +parameters(In, Required, Desc) -> [ {topic, mk(binary(), #{ - in => path, - required => true, - desc => ?DESC(topic) + in => In, + required => Required, + desc => ?DESC(Desc) })} ]. @@ -142,8 +152,10 @@ fields(message) -> | fields(message_summary) ]. -lookup_retained_warp(Type, Params) -> - check_backend(Type, Params, fun lookup_retained/2). +'/messages'(get, Params) -> + check_backend(get, Params, fun lookup_retained/2); +'/messages'(delete, Params) -> + delete_messages(delete, Params). with_topic_warp(Type, Params) -> check_backend(Type, Params, fun with_topic/2). @@ -168,7 +180,9 @@ config(put, #{body := Body}) -> lookup_retained(get, #{query_string := Qs}) -> Page = maps:get(<<"page">>, Qs, 1), Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:default_row_limit()), - {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, undefined, Page, Limit), + Topic = maps:get(<<"topic">>, Qs, undefined), + ct:print("Qs:~p~n", [Qs]), + {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit), {200, #{ data => [format_message(Msg) || Msg <- Msgs], meta => #{page => Page, limit => Limit, count => emqx_retainer_mnesia:size(?TAB_MESSAGE)} @@ -199,6 +213,10 @@ with_topic(delete, #{bindings := Bindings}) -> {204} end. +delete_messages(delete, _) -> + emqx_retainer:clean(), + {204}. + format_message(#message{ id = ID, qos = Qos, diff --git a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl index d00ade556..fb4dea16d 100644 --- a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -25,7 +25,9 @@ -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). --import(emqx_mgmt_api_test_util, [request_api/2, request_api/5, api_path/1, auth_header_/0]). +-import(emqx_mgmt_api_test_util, [ + request_api/2, request_api/4, request_api/5, api_path/1, auth_header_/0 +]). all() -> emqx_common_test_helpers:all(?MODULE). @@ -308,6 +310,37 @@ t_change_storage_type(_Config) -> ok. +t_match_and_clean(_) -> + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + emqx_retainer:clean(), + timer:sleep(300), + + _ = [ + emqtt:publish(C1, <

>, <<"retained">>, [{qos, 0}, {retain, true}]) + || P <- [<<"t">>, <<"f">>], S <- [<<"1">>, <<"2">>, <<"3">>] + ], + + timer:sleep(1000), + + API = api_path(["mqtt", "retainer", "messages"]), + {ok, LookupJson} = request_api(get, API, "topic=t/%2B", auth_header_()), + LookupResult = decode_json(LookupJson), + + Expected = lists:usort([<<"t/1">>, <<"t/2">>, <<"t/3">>]), + ?assertMatch( + Expected, + lists:usort([Topic || #{topic := Topic} <- maps:get(data, LookupResult)]) + ), + + CleanAPI = api_path(["mqtt", "retainer", "messages"]), + {ok, []} = request_api(delete, CleanAPI), + + {ok, LookupJson2} = request_api(get, API), + ?assertMatch(#{data := []}, decode_json(LookupJson2)), + + ok = emqtt:disconnect(C1). + %%-------------------------------------------------------------------- %% HTTP Request %%-------------------------------------------------------------------- diff --git a/rel/i18n/emqx_retainer_api.hocon b/rel/i18n/emqx_retainer_api.hocon index cdb49f8d0..f4bd00ae9 100644 --- a/rel/i18n/emqx_retainer_api.hocon +++ b/rel/i18n/emqx_retainer_api.hocon @@ -27,8 +27,13 @@ list_retained_api.desc: list_retained_api.label: """List retained messages.""" +delete_messages.desc: +"""Delete all retained messages""" +delete_messages.label: +"""Delete Retained Messages""" + lookup_api.desc: -"""Lookup a message by a topic without wildcards.""" +"""Lookup a message by a topic without wildcards, .""" lookup_api.label: """Lookup a message""" @@ -56,6 +61,9 @@ retained_list.desc: topic.desc: """Topic.""" +query_match_topic.desc: +"""Topic filter, supports wildcards, omit this to match all messages.""" + unsupported_backend.desc: """Unsupported backend."""