Merge pull request #12277 from HJianBo/feat-delete-delayed-messages-via-topic
feat: support to delete delayed messages via topic name
This commit is contained in:
commit
adcda51eff
|
@ -58,6 +58,7 @@
|
||||||
get_delayed_message/2,
|
get_delayed_message/2,
|
||||||
delete_delayed_message/1,
|
delete_delayed_message/1,
|
||||||
delete_delayed_message/2,
|
delete_delayed_message/2,
|
||||||
|
delete_delayed_messages_by_topic_name/1,
|
||||||
clear_all/0,
|
clear_all/0,
|
||||||
%% rpc target
|
%% rpc target
|
||||||
clear_all_local/0,
|
clear_all_local/0,
|
||||||
|
@ -95,6 +96,13 @@
|
||||||
%% sync ms with record change
|
%% sync ms with record change
|
||||||
-define(QUERY_MS(Id), [{{delayed_message, {'_', Id}, '_', '_'}, [], ['$_']}]).
|
-define(QUERY_MS(Id), [{{delayed_message, {'_', Id}, '_', '_'}, [], ['$_']}]).
|
||||||
-define(DELETE_MS(Id), [{{delayed_message, {'$1', Id}, '_', '_'}, [], ['$1']}]).
|
-define(DELETE_MS(Id), [{{delayed_message, {'$1', Id}, '_', '_'}, [], ['$1']}]).
|
||||||
|
-define(DELETE_BY_TOPIC_MS(Topic), [
|
||||||
|
{
|
||||||
|
{delayed_message, '$1', '_', {message, '_', '_', '_', '_', '_', Topic, '_', '_', '_'}},
|
||||||
|
[],
|
||||||
|
['$1']
|
||||||
|
}
|
||||||
|
]).
|
||||||
|
|
||||||
-define(TAB, ?MODULE).
|
-define(TAB, ?MODULE).
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
@ -267,6 +275,15 @@ delete_delayed_message(Node, Id) when Node =:= node() ->
|
||||||
delete_delayed_message(Node, Id) ->
|
delete_delayed_message(Node, Id) ->
|
||||||
emqx_delayed_proto_v2:delete_delayed_message(Node, Id).
|
emqx_delayed_proto_v2:delete_delayed_message(Node, Id).
|
||||||
|
|
||||||
|
-spec delete_delayed_messages_by_topic_name(binary()) -> with_id_return().
|
||||||
|
delete_delayed_messages_by_topic_name(TopicName) when is_binary(TopicName) ->
|
||||||
|
case ets:select(?TAB, ?DELETE_BY_TOPIC_MS(TopicName)) of
|
||||||
|
[] ->
|
||||||
|
{error, not_found};
|
||||||
|
Rows ->
|
||||||
|
lists:foreach(fun(Key) -> mria:dirty_delete(?TAB, Key) end, Rows)
|
||||||
|
end.
|
||||||
|
|
||||||
-spec clear_all() -> ok.
|
-spec clear_all() -> ok.
|
||||||
clear_all() ->
|
clear_all() ->
|
||||||
Nodes = emqx:running_nodes(),
|
Nodes = emqx:running_nodes(),
|
||||||
|
|
|
@ -27,7 +27,8 @@
|
||||||
-export([
|
-export([
|
||||||
status/2,
|
status/2,
|
||||||
delayed_messages/2,
|
delayed_messages/2,
|
||||||
delayed_message/2
|
delayed_message/2,
|
||||||
|
delayed_message_topic/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
|
@ -51,6 +52,9 @@
|
||||||
-define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR').
|
-define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR').
|
||||||
-define(INVALID_NODE, 'INVALID_NODE').
|
-define(INVALID_NODE, 'INVALID_NODE').
|
||||||
|
|
||||||
|
-define(INVALID_TOPIC, 'INVALID_TOPIC_NAME').
|
||||||
|
-define(MESSAGE_NOT_FOUND, 'MESSAGE_NOT_FOUND').
|
||||||
|
|
||||||
api_spec() ->
|
api_spec() ->
|
||||||
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
|
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
|
||||||
|
|
||||||
|
@ -58,6 +62,7 @@ paths() ->
|
||||||
[
|
[
|
||||||
"/mqtt/delayed",
|
"/mqtt/delayed",
|
||||||
"/mqtt/delayed/messages",
|
"/mqtt/delayed/messages",
|
||||||
|
"/mqtt/delayed/messages/:topic",
|
||||||
"/mqtt/delayed/messages/:node/:msgid"
|
"/mqtt/delayed/messages/:node/:msgid"
|
||||||
].
|
].
|
||||||
|
|
||||||
|
@ -87,6 +92,32 @@ schema("/mqtt/delayed") ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
schema("/mqtt/delayed/messages/:topic") ->
|
||||||
|
#{
|
||||||
|
'operationId' => delayed_message_topic,
|
||||||
|
delete => #{
|
||||||
|
tags => ?API_TAG_MQTT,
|
||||||
|
description => ?DESC(delete_api),
|
||||||
|
parameters => [
|
||||||
|
{topic,
|
||||||
|
mk(
|
||||||
|
binary(),
|
||||||
|
#{in => path, desc => ?DESC(topic)}
|
||||||
|
)}
|
||||||
|
],
|
||||||
|
responses => #{
|
||||||
|
204 => <<"Delete delayed message success">>,
|
||||||
|
400 => emqx_dashboard_swagger:error_codes(
|
||||||
|
[?INVALID_TOPIC],
|
||||||
|
?DESC(bad_topic_name)
|
||||||
|
),
|
||||||
|
404 => emqx_dashboard_swagger:error_codes(
|
||||||
|
[?MESSAGE_NOT_FOUND],
|
||||||
|
?DESC(no_delayed_message)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
schema("/mqtt/delayed/messages/:node/:msgid") ->
|
schema("/mqtt/delayed/messages/:node/:msgid") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => delayed_message,
|
'operationId' => delayed_message,
|
||||||
|
@ -223,6 +254,19 @@ delayed_message(delete, #{bindings := #{node := NodeBin, msgid := HexId}}) ->
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
|
delayed_message_topic(delete, #{bindings := #{topic := Topic}}) ->
|
||||||
|
MaybeTopic = make_maybe(Topic, invalid_topic_name, fun validate_topic_name/1),
|
||||||
|
with_maybe(
|
||||||
|
[MaybeTopic],
|
||||||
|
fun(TopicName) ->
|
||||||
|
case emqx_delayed:delete_delayed_messages_by_topic_name(TopicName) of
|
||||||
|
ok ->
|
||||||
|
{204};
|
||||||
|
{error, not_found} ->
|
||||||
|
{404, generate_http_code_map(message_not_found, TopicName)}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
).
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% internal function
|
%% internal function
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -279,6 +323,12 @@ generate_http_code_map(not_found, Id) ->
|
||||||
message =>
|
message =>
|
||||||
iolist_to_binary(io_lib:format("Message ID ~s not found", [Id]))
|
iolist_to_binary(io_lib:format("Message ID ~s not found", [Id]))
|
||||||
};
|
};
|
||||||
|
generate_http_code_map(message_not_found, Topic) ->
|
||||||
|
#{
|
||||||
|
code => ?MESSAGE_NOT_FOUND,
|
||||||
|
message =>
|
||||||
|
iolist_to_binary(io_lib:format("Not found messages for ~s", [Topic]))
|
||||||
|
};
|
||||||
generate_http_code_map(invalid_node, Node) ->
|
generate_http_code_map(invalid_node, Node) ->
|
||||||
#{
|
#{
|
||||||
code => ?INVALID_NODE,
|
code => ?INVALID_NODE,
|
||||||
|
@ -295,6 +345,14 @@ make_maybe(X, Error, Fun) ->
|
||||||
{left, X, Error}
|
{left, X, Error}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
validate_topic_name(Topic) ->
|
||||||
|
case emqx_topic:wildcard(Topic) of
|
||||||
|
true ->
|
||||||
|
error(badarg);
|
||||||
|
false ->
|
||||||
|
Topic
|
||||||
|
end.
|
||||||
|
|
||||||
with_maybe(Maybes, Cont) ->
|
with_maybe(Maybes, Cont) ->
|
||||||
with_maybe(Maybes, Cont, []).
|
with_maybe(Maybes, Cont, []).
|
||||||
|
|
||||||
|
|
|
@ -189,6 +189,30 @@ t_messages(_) ->
|
||||||
|
|
||||||
ok = emqtt:disconnect(C1).
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
|
t_delete_messages_via_topic(_) ->
|
||||||
|
clear_all_record(),
|
||||||
|
emqx_delayed:load(),
|
||||||
|
|
||||||
|
OriginTopic = <<"t/a">>,
|
||||||
|
Topic = <<"$delayed/123/", OriginTopic/binary>>,
|
||||||
|
|
||||||
|
publish_a_delayed_message(Topic),
|
||||||
|
publish_a_delayed_message(Topic),
|
||||||
|
|
||||||
|
%% assert: delayed messages are saved
|
||||||
|
?assertMatch([_, _], get_messages(2)),
|
||||||
|
|
||||||
|
%% delete these messages via topic
|
||||||
|
TopicInUrl = uri_string:quote(OriginTopic),
|
||||||
|
{ok, 204, _} = request(
|
||||||
|
delete,
|
||||||
|
uri(["mqtt", "delayed", "messages", TopicInUrl])
|
||||||
|
),
|
||||||
|
|
||||||
|
%% assert: messages are deleted
|
||||||
|
?assertEqual([], get_messages(0)),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_large_payload(_) ->
|
t_large_payload(_) ->
|
||||||
clear_all_record(),
|
clear_all_record(),
|
||||||
emqx_delayed:load(),
|
emqx_delayed:load(),
|
||||||
|
@ -246,3 +270,14 @@ get_messages(Len) ->
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
Msgs.
|
Msgs.
|
||||||
|
|
||||||
|
publish_a_delayed_message(Topic) ->
|
||||||
|
{ok, C1} = emqtt:start_link([{clean_start, true}]),
|
||||||
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
emqtt:publish(
|
||||||
|
C1,
|
||||||
|
Topic,
|
||||||
|
<<"This is a delayed messages">>,
|
||||||
|
[{qos, 1}]
|
||||||
|
),
|
||||||
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Add `mqtt/delayed/messages/:topic` endpoint to remove delayed messages by topic name.
|
|
@ -65,6 +65,16 @@ msgid_not_found.desc:
|
||||||
msgid_not_found.label:
|
msgid_not_found.label:
|
||||||
"""Message ID not found"""
|
"""Message ID not found"""
|
||||||
|
|
||||||
|
bad_topic_name.desc:
|
||||||
|
"""Bad Topic Name"""
|
||||||
|
bad_topic_name.label:
|
||||||
|
"""Bad Topic Name"""
|
||||||
|
|
||||||
|
no_delayed_message.desc:
|
||||||
|
"""Not found delayed message for this topic"""
|
||||||
|
no_delayed_message.label:
|
||||||
|
"""Not found delayed message for this topic"""
|
||||||
|
|
||||||
node.desc:
|
node.desc:
|
||||||
"""The node where message from"""
|
"""The node where message from"""
|
||||||
node.label:
|
node.label:
|
||||||
|
|
Loading…
Reference in New Issue