From eea0ec135fdf440e3debaafd47773dbf863cf4f7 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 11 Jan 2024 10:36:55 +0800 Subject: [PATCH 1/3] fix(delayed): fix http 500 error --- apps/emqx_modules/src/emqx_delayed_api.erl | 6 ++++++ .../test/emqx_delayed_api_SUITE.erl | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index c4b86eeb3..f6ea99c12 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -329,6 +329,12 @@ generate_http_code_map(message_not_found, Topic) -> message => iolist_to_binary(io_lib:format("Not found messages for ~s", [Topic])) }; +generate_http_code_map(invalid_topic_name, Topic) -> + #{ + code => ?INVALID_TOPIC, + message => + iolist_to_binary(io_lib:format("The topic name ~s is invalid", [Topic])) + }; generate_http_code_map(invalid_node, Node) -> #{ code => ?INVALID_NODE, diff --git a/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl index 3934c3a1a..d1e747b19 100644 --- a/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl @@ -211,6 +211,25 @@ t_delete_messages_via_topic(_) -> %% assert: messages are deleted ?assertEqual([], get_messages(0)), + + %% assert: return 400 if the topic parameter is invalid + TopicFilter = uri_string:quote(<<"t/#">>), + ?assertMatch( + {ok, 400, _}, + request( + delete, + uri(["mqtt", "delayed", "messages", TopicFilter]) + ) + ), + + %% assert: return 404 if no messages found for the topic + ?assertMatch( + {ok, 404, _}, + request( + delete, + uri(["mqtt", "delayed", "messages", TopicInUrl]) + ) + ), ok. t_large_payload(_) -> From c4dd083fd9690bcd3ca029e7ea96c802de698f35 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 11 Jan 2024 14:12:30 +0800 Subject: [PATCH 2/3] fix(delayed): delete delayed messge on all nodes --- apps/emqx_modules/src/emqx_delayed.erl | 36 ++++++++++++ .../src/proto/emqx_delayed_proto_v3.erl | 56 +++++++++++++++++++ 2 files changed, 92 insertions(+) create mode 100644 apps/emqx_modules/src/proto/emqx_delayed_proto_v3.erl diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 8325a41d3..d2ed12332 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -65,6 +65,11 @@ cluster_list/1 ]). +%% exports for internal rpc +-export([ + do_delete_delayed_messages_by_topic_name/1 +]). + %% exports for query -export([ qs2ms/2, @@ -277,6 +282,37 @@ delete_delayed_message(Node, Id) -> -spec delete_delayed_messages_by_topic_name(binary()) -> with_id_return(). delete_delayed_messages_by_topic_name(TopicName) when is_binary(TopicName) -> + Nodes = emqx:running_nodes(), + Result = emqx_delayed_proto_v3:delete_delayed_messages_by_topic_name(Nodes, TopicName), + case + lists:any( + fun + ({ok, ok}) -> true; + (_) -> false + end, + Result + ) + of + true -> + ok; + false -> + Errors = lists:filter( + fun + ({ok, {error, not_found}}) -> false; + (_) -> true + end, + Result + ), + case Errors of + [] -> + {error, not_found}; + [Exception | _] -> + {error, Exception} + end + end. + +-spec do_delete_delayed_messages_by_topic_name(binary()) -> with_id_return(). +do_delete_delayed_messages_by_topic_name(TopicName) when is_binary(TopicName) -> case ets:select(?TAB, ?DELETE_BY_TOPIC_MS(TopicName)) of [] -> {error, not_found}; diff --git a/apps/emqx_modules/src/proto/emqx_delayed_proto_v3.erl b/apps/emqx_modules/src/proto/emqx_delayed_proto_v3.erl new file mode 100644 index 000000000..651c616ce --- /dev/null +++ b/apps/emqx_modules/src/proto/emqx_delayed_proto_v3.erl @@ -0,0 +1,56 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_delayed_proto_v3). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + get_delayed_message/2, + delete_delayed_message/2, + + %% Introduced in v2: + clear_all/1, + %% Introduced in v3: + delete_delayed_messages_by_topic_name/2 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.5.0". + +-spec get_delayed_message(node(), binary()) -> + emqx_delayed:with_id_return(map()) | emqx_rpc:badrpc(). +get_delayed_message(Node, Id) -> + rpc:call(Node, emqx_delayed, get_delayed_message, [Id]). + +-spec delete_delayed_message(node(), binary()) -> emqx_delayed:with_id_return() | emqx_rpc:badrpc(). +delete_delayed_message(Node, Id) -> + rpc:call(Node, emqx_delayed, delete_delayed_message, [Id]). + +%% Introduced in v2: + +-spec clear_all([node()]) -> emqx_rpc:erpc_multicall(ok). +clear_all(Nodes) -> + erpc:multicall(Nodes, emqx_delayed, clear_all_local, []). + +%% Introduced in v3: + +-spec delete_delayed_messages_by_topic_name(list(), binary()) -> emqx_rpc:erpc_multicall(ok). +delete_delayed_messages_by_topic_name(Nodes, TopicName) -> + erpc:multicall(Nodes, emqx_delayed, do_delete_delayed_messages_by_topic_name, [TopicName]). From 596607d5495295c199cb6ce665e18d60975b9ea2 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 11 Jan 2024 14:56:02 +0800 Subject: [PATCH 3/3] chore: update bpapi.versions --- apps/emqx/priv/bpapi.versions | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 208c34e30..2777aec53 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -18,6 +18,7 @@ {emqx_dashboard,1}. {emqx_delayed,1}. {emqx_delayed,2}. +{emqx_delayed,3}. {emqx_ds,1}. {emqx_ds,2}. {emqx_eviction_agent,1}.