Merge pull request #12296 from HJianBo/fix-delayed-message-http-api-500

fix(delayed): fix http 500 error
This commit is contained in:
JianBo He 2024-01-12 09:37:41 +08:00 committed by GitHub
commit 9e85b53c39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 118 additions and 0 deletions

View File

@ -18,6 +18,7 @@
{emqx_dashboard,1}.
{emqx_delayed,1}.
{emqx_delayed,2}.
{emqx_delayed,3}.
{emqx_ds,1}.
{emqx_ds,2}.
{emqx_eviction_agent,1}.

View File

@ -65,6 +65,11 @@
cluster_list/1
]).
%% exports for internal rpc
-export([
do_delete_delayed_messages_by_topic_name/1
]).
%% exports for query
-export([
qs2ms/2,
@ -277,6 +282,37 @@ delete_delayed_message(Node, Id) ->
-spec delete_delayed_messages_by_topic_name(binary()) -> with_id_return().
delete_delayed_messages_by_topic_name(TopicName) when is_binary(TopicName) ->
Nodes = emqx:running_nodes(),
Result = emqx_delayed_proto_v3:delete_delayed_messages_by_topic_name(Nodes, TopicName),
case
lists:any(
fun
({ok, ok}) -> true;
(_) -> false
end,
Result
)
of
true ->
ok;
false ->
Errors = lists:filter(
fun
({ok, {error, not_found}}) -> false;
(_) -> true
end,
Result
),
case Errors of
[] ->
{error, not_found};
[Exception | _] ->
{error, Exception}
end
end.
-spec do_delete_delayed_messages_by_topic_name(binary()) -> with_id_return().
do_delete_delayed_messages_by_topic_name(TopicName) when is_binary(TopicName) ->
case ets:select(?TAB, ?DELETE_BY_TOPIC_MS(TopicName)) of
[] ->
{error, not_found};

View File

@ -329,6 +329,12 @@ generate_http_code_map(message_not_found, Topic) ->
message =>
iolist_to_binary(io_lib:format("Not found messages for ~s", [Topic]))
};
generate_http_code_map(invalid_topic_name, Topic) ->
#{
code => ?INVALID_TOPIC,
message =>
iolist_to_binary(io_lib:format("The topic name ~s is invalid", [Topic]))
};
generate_http_code_map(invalid_node, Node) ->
#{
code => ?INVALID_NODE,

View File

@ -0,0 +1,56 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_delayed_proto_v3).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
get_delayed_message/2,
delete_delayed_message/2,
%% Introduced in v2:
clear_all/1,
%% Introduced in v3:
delete_delayed_messages_by_topic_name/2
]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.5.0".
-spec get_delayed_message(node(), binary()) ->
emqx_delayed:with_id_return(map()) | emqx_rpc:badrpc().
get_delayed_message(Node, Id) ->
rpc:call(Node, emqx_delayed, get_delayed_message, [Id]).
-spec delete_delayed_message(node(), binary()) -> emqx_delayed:with_id_return() | emqx_rpc:badrpc().
delete_delayed_message(Node, Id) ->
rpc:call(Node, emqx_delayed, delete_delayed_message, [Id]).
%% Introduced in v2:
-spec clear_all([node()]) -> emqx_rpc:erpc_multicall(ok).
clear_all(Nodes) ->
erpc:multicall(Nodes, emqx_delayed, clear_all_local, []).
%% Introduced in v3:
-spec delete_delayed_messages_by_topic_name(list(), binary()) -> emqx_rpc:erpc_multicall(ok).
delete_delayed_messages_by_topic_name(Nodes, TopicName) ->
erpc:multicall(Nodes, emqx_delayed, do_delete_delayed_messages_by_topic_name, [TopicName]).

View File

@ -211,6 +211,25 @@ t_delete_messages_via_topic(_) ->
%% assert: messages are deleted
?assertEqual([], get_messages(0)),
%% assert: return 400 if the topic parameter is invalid
TopicFilter = uri_string:quote(<<"t/#">>),
?assertMatch(
{ok, 400, _},
request(
delete,
uri(["mqtt", "delayed", "messages", TopicFilter])
)
),
%% assert: return 404 if no messages found for the topic
?assertMatch(
{ok, 404, _},
request(
delete,
uri(["mqtt", "delayed", "messages", TopicInUrl])
)
),
ok.
t_large_payload(_) ->