fix(delayed): delete delayed messge on all nodes

This commit is contained in:
JianBo He 2024-01-11 14:12:30 +08:00
parent eea0ec135f
commit c4dd083fd9
2 changed files with 92 additions and 0 deletions

View File

@ -65,6 +65,11 @@
cluster_list/1 cluster_list/1
]). ]).
%% exports for internal rpc
-export([
do_delete_delayed_messages_by_topic_name/1
]).
%% exports for query %% exports for query
-export([ -export([
qs2ms/2, qs2ms/2,
@ -277,6 +282,37 @@ delete_delayed_message(Node, Id) ->
-spec delete_delayed_messages_by_topic_name(binary()) -> with_id_return(). -spec delete_delayed_messages_by_topic_name(binary()) -> with_id_return().
delete_delayed_messages_by_topic_name(TopicName) when is_binary(TopicName) -> delete_delayed_messages_by_topic_name(TopicName) when is_binary(TopicName) ->
Nodes = emqx:running_nodes(),
Result = emqx_delayed_proto_v3:delete_delayed_messages_by_topic_name(Nodes, TopicName),
case
lists:any(
fun
({ok, ok}) -> true;
(_) -> false
end,
Result
)
of
true ->
ok;
false ->
Errors = lists:filter(
fun
({ok, {error, not_found}}) -> false;
(_) -> true
end,
Result
),
case Errors of
[] ->
{error, not_found};
[Exception | _] ->
{error, Exception}
end
end.
-spec do_delete_delayed_messages_by_topic_name(binary()) -> with_id_return().
do_delete_delayed_messages_by_topic_name(TopicName) when is_binary(TopicName) ->
case ets:select(?TAB, ?DELETE_BY_TOPIC_MS(TopicName)) of case ets:select(?TAB, ?DELETE_BY_TOPIC_MS(TopicName)) of
[] -> [] ->
{error, not_found}; {error, not_found};

View File

@ -0,0 +1,56 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_delayed_proto_v3).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
get_delayed_message/2,
delete_delayed_message/2,
%% Introduced in v2:
clear_all/1,
%% Introduced in v3:
delete_delayed_messages_by_topic_name/2
]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.5.0".
-spec get_delayed_message(node(), binary()) ->
emqx_delayed:with_id_return(map()) | emqx_rpc:badrpc().
get_delayed_message(Node, Id) ->
rpc:call(Node, emqx_delayed, get_delayed_message, [Id]).
-spec delete_delayed_message(node(), binary()) -> emqx_delayed:with_id_return() | emqx_rpc:badrpc().
delete_delayed_message(Node, Id) ->
rpc:call(Node, emqx_delayed, delete_delayed_message, [Id]).
%% Introduced in v2:
-spec clear_all([node()]) -> emqx_rpc:erpc_multicall(ok).
clear_all(Nodes) ->
erpc:multicall(Nodes, emqx_delayed, clear_all_local, []).
%% Introduced in v3:
-spec delete_delayed_messages_by_topic_name(list(), binary()) -> emqx_rpc:erpc_multicall(ok).
delete_delayed_messages_by_topic_name(Nodes, TopicName) ->
erpc:multicall(Nodes, emqx_delayed, do_delete_delayed_messages_by_topic_name, [TopicName]).