From 9ce96bafa354452e333c638dde7a5f751aa6f206 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 9 Jan 2024 15:06:06 +0800 Subject: [PATCH 1/2] feat: support to delete delayed messages via topic name --- apps/emqx_modules/src/emqx_delayed.erl | 17 ++++++ apps/emqx_modules/src/emqx_delayed_api.erl | 60 ++++++++++++++++++- .../test/emqx_delayed_api_SUITE.erl | 35 +++++++++++ rel/i18n/emqx_delayed_api.hocon | 10 ++++ 4 files changed, 121 insertions(+), 1 deletion(-) diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 7cf9018bc..8325a41d3 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -58,6 +58,7 @@ get_delayed_message/2, delete_delayed_message/1, delete_delayed_message/2, + delete_delayed_messages_by_topic_name/1, clear_all/0, %% rpc target clear_all_local/0, @@ -95,6 +96,13 @@ %% sync ms with record change -define(QUERY_MS(Id), [{{delayed_message, {'_', Id}, '_', '_'}, [], ['$_']}]). -define(DELETE_MS(Id), [{{delayed_message, {'$1', Id}, '_', '_'}, [], ['$1']}]). +-define(DELETE_BY_TOPIC_MS(Topic), [ + { + {delayed_message, '$1', '_', {message, '_', '_', '_', '_', '_', Topic, '_', '_', '_'}}, + [], + ['$1'] + } +]). -define(TAB, ?MODULE). -define(SERVER, ?MODULE). @@ -267,6 +275,15 @@ delete_delayed_message(Node, Id) when Node =:= node() -> delete_delayed_message(Node, Id) -> emqx_delayed_proto_v2:delete_delayed_message(Node, Id). +-spec delete_delayed_messages_by_topic_name(binary()) -> with_id_return(). +delete_delayed_messages_by_topic_name(TopicName) when is_binary(TopicName) -> + case ets:select(?TAB, ?DELETE_BY_TOPIC_MS(TopicName)) of + [] -> + {error, not_found}; + Rows -> + lists:foreach(fun(Key) -> mria:dirty_delete(?TAB, Key) end, Rows) + end. + -spec clear_all() -> ok. clear_all() -> Nodes = emqx:running_nodes(), diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index 766d23d6b..c4b86eeb3 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -27,7 +27,8 @@ -export([ status/2, delayed_messages/2, - delayed_message/2 + delayed_message/2, + delayed_message_topic/2 ]). -export([ @@ -51,6 +52,9 @@ -define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR'). -define(INVALID_NODE, 'INVALID_NODE'). +-define(INVALID_TOPIC, 'INVALID_TOPIC_NAME'). +-define(MESSAGE_NOT_FOUND, 'MESSAGE_NOT_FOUND'). + api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). @@ -58,6 +62,7 @@ paths() -> [ "/mqtt/delayed", "/mqtt/delayed/messages", + "/mqtt/delayed/messages/:topic", "/mqtt/delayed/messages/:node/:msgid" ]. @@ -87,6 +92,32 @@ schema("/mqtt/delayed") -> } } }; +schema("/mqtt/delayed/messages/:topic") -> + #{ + 'operationId' => delayed_message_topic, + delete => #{ + tags => ?API_TAG_MQTT, + description => ?DESC(delete_api), + parameters => [ + {topic, + mk( + binary(), + #{in => path, desc => ?DESC(topic)} + )} + ], + responses => #{ + 204 => <<"Delete delayed message success">>, + 400 => emqx_dashboard_swagger:error_codes( + [?INVALID_TOPIC], + ?DESC(bad_topic_name) + ), + 404 => emqx_dashboard_swagger:error_codes( + [?MESSAGE_NOT_FOUND], + ?DESC(no_delayed_message) + ) + } + } + }; schema("/mqtt/delayed/messages/:node/:msgid") -> #{ 'operationId' => delayed_message, @@ -223,6 +254,19 @@ delayed_message(delete, #{bindings := #{node := NodeBin, msgid := HexId}}) -> end ). +delayed_message_topic(delete, #{bindings := #{topic := Topic}}) -> + MaybeTopic = make_maybe(Topic, invalid_topic_name, fun validate_topic_name/1), + with_maybe( + [MaybeTopic], + fun(TopicName) -> + case emqx_delayed:delete_delayed_messages_by_topic_name(TopicName) of + ok -> + {204}; + {error, not_found} -> + {404, generate_http_code_map(message_not_found, TopicName)} + end + end + ). %%-------------------------------------------------------------------- %% internal function %%-------------------------------------------------------------------- @@ -279,6 +323,12 @@ generate_http_code_map(not_found, Id) -> message => iolist_to_binary(io_lib:format("Message ID ~s not found", [Id])) }; +generate_http_code_map(message_not_found, Topic) -> + #{ + code => ?MESSAGE_NOT_FOUND, + message => + iolist_to_binary(io_lib:format("Not found messages for ~s", [Topic])) + }; generate_http_code_map(invalid_node, Node) -> #{ code => ?INVALID_NODE, @@ -295,6 +345,14 @@ make_maybe(X, Error, Fun) -> {left, X, Error} end. +validate_topic_name(Topic) -> + case emqx_topic:wildcard(Topic) of + true -> + error(badarg); + false -> + Topic + end. + with_maybe(Maybes, Cont) -> with_maybe(Maybes, Cont, []). diff --git a/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl index b5995a47a..3934c3a1a 100644 --- a/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl @@ -189,6 +189,30 @@ t_messages(_) -> ok = emqtt:disconnect(C1). +t_delete_messages_via_topic(_) -> + clear_all_record(), + emqx_delayed:load(), + + OriginTopic = <<"t/a">>, + Topic = <<"$delayed/123/", OriginTopic/binary>>, + + publish_a_delayed_message(Topic), + publish_a_delayed_message(Topic), + + %% assert: delayed messages are saved + ?assertMatch([_, _], get_messages(2)), + + %% delete these messages via topic + TopicInUrl = uri_string:quote(OriginTopic), + {ok, 204, _} = request( + delete, + uri(["mqtt", "delayed", "messages", TopicInUrl]) + ), + + %% assert: messages are deleted + ?assertEqual([], get_messages(0)), + ok. + t_large_payload(_) -> clear_all_record(), emqx_delayed:load(), @@ -246,3 +270,14 @@ get_messages(Len) -> ) ), Msgs. + +publish_a_delayed_message(Topic) -> + {ok, C1} = emqtt:start_link([{clean_start, true}]), + {ok, _} = emqtt:connect(C1), + emqtt:publish( + C1, + Topic, + <<"This is a delayed messages">>, + [{qos, 1}] + ), + ok = emqtt:disconnect(C1). diff --git a/rel/i18n/emqx_delayed_api.hocon b/rel/i18n/emqx_delayed_api.hocon index b891c618c..4341e4e91 100644 --- a/rel/i18n/emqx_delayed_api.hocon +++ b/rel/i18n/emqx_delayed_api.hocon @@ -65,6 +65,16 @@ msgid_not_found.desc: msgid_not_found.label: """Message ID not found""" +bad_topic_name.desc: +"""Bad Topic Name""" +bad_topic_name.label: +"""Bad Topic Name""" + +no_delayed_message.desc: +"""Not found delayed message for this topic""" +no_delayed_message.label: +"""Not found delayed message for this topic""" + node.desc: """The node where message from""" node.label: From 147b5f8953638aa06f969a5de1d410c43ae5c15e Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 9 Jan 2024 15:11:01 +0800 Subject: [PATCH 2/2] chore: update changes --- changes/ce/feat-12277.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ce/feat-12277.en.md diff --git a/changes/ce/feat-12277.en.md b/changes/ce/feat-12277.en.md new file mode 100644 index 000000000..d491049f6 --- /dev/null +++ b/changes/ce/feat-12277.en.md @@ -0,0 +1 @@ +Add `mqtt/delayed/messages/:topic` endpoint to remove delayed messages by topic name.