From c4dd083fd9690bcd3ca029e7ea96c802de698f35 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 11 Jan 2024 14:12:30 +0800 Subject: [PATCH] fix(delayed): delete delayed messge on all nodes --- apps/emqx_modules/src/emqx_delayed.erl | 36 ++++++++++++ .../src/proto/emqx_delayed_proto_v3.erl | 56 +++++++++++++++++++ 2 files changed, 92 insertions(+) create mode 100644 apps/emqx_modules/src/proto/emqx_delayed_proto_v3.erl diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 8325a41d3..d2ed12332 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -65,6 +65,11 @@ cluster_list/1 ]). +%% exports for internal rpc +-export([ + do_delete_delayed_messages_by_topic_name/1 +]). + %% exports for query -export([ qs2ms/2, @@ -277,6 +282,37 @@ delete_delayed_message(Node, Id) -> -spec delete_delayed_messages_by_topic_name(binary()) -> with_id_return(). delete_delayed_messages_by_topic_name(TopicName) when is_binary(TopicName) -> + Nodes = emqx:running_nodes(), + Result = emqx_delayed_proto_v3:delete_delayed_messages_by_topic_name(Nodes, TopicName), + case + lists:any( + fun + ({ok, ok}) -> true; + (_) -> false + end, + Result + ) + of + true -> + ok; + false -> + Errors = lists:filter( + fun + ({ok, {error, not_found}}) -> false; + (_) -> true + end, + Result + ), + case Errors of + [] -> + {error, not_found}; + [Exception | _] -> + {error, Exception} + end + end. + +-spec do_delete_delayed_messages_by_topic_name(binary()) -> with_id_return(). +do_delete_delayed_messages_by_topic_name(TopicName) when is_binary(TopicName) -> case ets:select(?TAB, ?DELETE_BY_TOPIC_MS(TopicName)) of [] -> {error, not_found}; diff --git a/apps/emqx_modules/src/proto/emqx_delayed_proto_v3.erl b/apps/emqx_modules/src/proto/emqx_delayed_proto_v3.erl new file mode 100644 index 000000000..651c616ce --- /dev/null +++ b/apps/emqx_modules/src/proto/emqx_delayed_proto_v3.erl @@ -0,0 +1,56 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_delayed_proto_v3). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + get_delayed_message/2, + delete_delayed_message/2, + + %% Introduced in v2: + clear_all/1, + %% Introduced in v3: + delete_delayed_messages_by_topic_name/2 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.5.0". + +-spec get_delayed_message(node(), binary()) -> + emqx_delayed:with_id_return(map()) | emqx_rpc:badrpc(). +get_delayed_message(Node, Id) -> + rpc:call(Node, emqx_delayed, get_delayed_message, [Id]). + +-spec delete_delayed_message(node(), binary()) -> emqx_delayed:with_id_return() | emqx_rpc:badrpc(). +delete_delayed_message(Node, Id) -> + rpc:call(Node, emqx_delayed, delete_delayed_message, [Id]). + +%% Introduced in v2: + +-spec clear_all([node()]) -> emqx_rpc:erpc_multicall(ok). +clear_all(Nodes) -> + erpc:multicall(Nodes, emqx_delayed, clear_all_local, []). + +%% Introduced in v3: + +-spec delete_delayed_messages_by_topic_name(list(), binary()) -> emqx_rpc:erpc_multicall(ok). +delete_delayed_messages_by_topic_name(Nodes, TopicName) -> + erpc:multicall(Nodes, emqx_delayed, do_delete_delayed_messages_by_topic_name, [TopicName]).