Merge pull request #7328 from lafirest/fix/delayed_lookup
fix(delayed): make it possible to lookup/delete other node's delayed …
This commit is contained in:
commit
d231b2222e
|
@ -20,3 +20,4 @@
|
|||
{emqx_statsd,1}.
|
||||
{emqx_telemetry,1}.
|
||||
{emqx_topic_metrics,1}.
|
||||
{emqx_delayed,1}.
|
||||
|
|
|
@ -47,7 +47,9 @@
|
|||
, update_config/1
|
||||
, list/1
|
||||
, get_delayed_message/1
|
||||
, get_delayed_message/2
|
||||
, delete_delayed_message/1
|
||||
, delete_delayed_message/2
|
||||
, post_config_update/5
|
||||
, cluster_list/1
|
||||
, cluster_query/4
|
||||
|
@ -57,6 +59,10 @@
|
|||
|
||||
-record(delayed_message, {key, delayed, msg}).
|
||||
-type delayed_message() :: #delayed_message{}.
|
||||
-type with_id_return() :: ok | {error, not_found}.
|
||||
-type with_id_return(T) :: {ok, T} | {error, not_found}.
|
||||
|
||||
-export_type([with_id_return/0, with_id_return/1]).
|
||||
|
||||
|
||||
-type state() :: #{ publish_timer := maybe(timer:tref())
|
||||
|
@ -164,6 +170,7 @@ format_delayed(#delayed_message{key = {ExpectTimeStamp, Id}, delayed = Delayed,
|
|||
RemainingTime = ExpectTimeStamp - erlang:system_time(second),
|
||||
Result = #{
|
||||
msgid => emqx_guid:to_hexstr(Id),
|
||||
node => node(),
|
||||
publish_at => PublishTime,
|
||||
delayed_interval => Delayed,
|
||||
delayed_remaining => RemainingTime,
|
||||
|
@ -183,22 +190,24 @@ format_delayed(#delayed_message{key = {ExpectTimeStamp, Id}, delayed = Delayed,
|
|||
to_rfc3339(Timestamp) ->
|
||||
list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, second}])).
|
||||
|
||||
get_delayed_message(Id0) ->
|
||||
try emqx_guid:from_hexstr(Id0) of
|
||||
Id ->
|
||||
case ets:select(?TAB, ?QUERY_MS(Id)) of
|
||||
[] ->
|
||||
{error, not_found};
|
||||
Rows ->
|
||||
Message = hd(Rows),
|
||||
{ok, format_delayed(Message, true)}
|
||||
end
|
||||
catch
|
||||
error:function_clause -> {error, id_schema_error}
|
||||
-spec get_delayed_message(binary()) -> with_id_return(map()).
|
||||
get_delayed_message(Id) ->
|
||||
case ets:select(?TAB, ?QUERY_MS(Id)) of
|
||||
[] ->
|
||||
{error, not_found};
|
||||
Rows ->
|
||||
Message = hd(Rows),
|
||||
{ok, format_delayed(Message, true)}
|
||||
end.
|
||||
|
||||
delete_delayed_message(Id0) ->
|
||||
Id = emqx_guid:from_hexstr(Id0),
|
||||
get_delayed_message(Node, Id) when Node =:= node() ->
|
||||
get_delayed_message(Id);
|
||||
|
||||
get_delayed_message(Node, Id) ->
|
||||
emqx_delayed_proto_v1:get_delayed_message(Node, Id).
|
||||
|
||||
-spec delete_delayed_message(binary()) -> with_id_return().
|
||||
delete_delayed_message(Id) ->
|
||||
case ets:select(?TAB, ?DELETE_MS(Id)) of
|
||||
[] ->
|
||||
{error, not_found};
|
||||
|
@ -206,6 +215,12 @@ delete_delayed_message(Id0) ->
|
|||
Timestamp = hd(Rows),
|
||||
mria:dirty_delete(?TAB, {Timestamp, Id})
|
||||
end.
|
||||
|
||||
delete_delayed_message(Node, Id) when Node =:= node() ->
|
||||
delete_delayed_message(Id);
|
||||
delete_delayed_message(Node, Id) ->
|
||||
emqx_delayed_proto_v1:delete_delayed_message(Node, Id).
|
||||
|
||||
update_config(Config) ->
|
||||
emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}).
|
||||
|
||||
|
|
|
@ -40,14 +40,12 @@
|
|||
|
||||
-export([api_spec/0]).
|
||||
|
||||
-define(ALREADY_ENABLED, 'ALREADY_ENABLED').
|
||||
-define(ALREADY_DISABLED, 'ALREADY_DISABLED').
|
||||
|
||||
-define(INTERNAL_ERROR, 'INTERNAL_ERROR').
|
||||
-define(BAD_REQUEST, 'BAD_REQUEST').
|
||||
|
||||
-define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND').
|
||||
-define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR').
|
||||
-define(INVALID_NODE, 'INVALID_NODE').
|
||||
-define(MAX_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024
|
||||
|
||||
api_spec() ->
|
||||
|
@ -56,7 +54,7 @@ api_spec() ->
|
|||
paths() ->
|
||||
[ "/mqtt/delayed"
|
||||
, "/mqtt/delayed/messages"
|
||||
, "/mqtt/delayed/messages/:msgid"
|
||||
, "/mqtt/delayed/messages/:node/:msgid"
|
||||
].
|
||||
|
||||
schema("/mqtt/delayed") ->
|
||||
|
@ -83,12 +81,16 @@ schema("/mqtt/delayed") ->
|
|||
}
|
||||
};
|
||||
|
||||
schema("/mqtt/delayed/messages/:msgid") ->
|
||||
schema("/mqtt/delayed/messages/:node/:msgid") ->
|
||||
#{'operationId' => delayed_message,
|
||||
get => #{
|
||||
tags => ?API_TAG_MQTT,
|
||||
description => <<"Get delayed message">>,
|
||||
parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}],
|
||||
parameters => [ {node,
|
||||
mk(binary(),
|
||||
#{in => path, desc => <<"The node where message from">>})}
|
||||
, {msgid, mk(binary(), #{in => path, desc => <<"Delay message ID">>})}
|
||||
],
|
||||
responses => #{
|
||||
200 => ref("message_without_payload"),
|
||||
400 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_SCHEMA_ERROR]
|
||||
|
@ -100,7 +102,12 @@ schema("/mqtt/delayed/messages/:msgid") ->
|
|||
delete => #{
|
||||
tags => ?API_TAG_MQTT,
|
||||
description => <<"Delete delayed message">>,
|
||||
parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}],
|
||||
parameters => [ {node,
|
||||
mk(binary(),
|
||||
#{in => path, desc => <<"The node where message from">>})}
|
||||
, {msgid,
|
||||
mk(binary(), #{in => path, desc => <<"Delay message ID">>})}
|
||||
],
|
||||
responses => #{
|
||||
204 => <<"Delete delayed message success">>,
|
||||
400 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_SCHEMA_ERROR]
|
||||
|
@ -133,15 +140,16 @@ schema("/mqtt/delayed/messages") ->
|
|||
|
||||
fields("message_without_payload") ->
|
||||
[
|
||||
{msgid, mk(integer(), #{desc => <<"Message Id (MQTT message id hash)">>})},
|
||||
{publish_at, mk(binary(), #{desc => <<"Client publish message time, rfc 3339">>})},
|
||||
{delayed_interval, mk(integer(), #{desc => <<"Delayed interval, second">>})},
|
||||
{delayed_remaining, mk(integer(), #{desc => <<"Delayed remaining, second">>})},
|
||||
{expected_at, mk(binary(), #{desc => <<"Expect publish time, rfc 3339">>})},
|
||||
{topic, mk(binary(), #{desc => <<"Topic">>, example => <<"/sys/#">>})},
|
||||
{qos, mk(binary(), #{desc => <<"QoS">>})},
|
||||
{from_clientid, mk(binary(), #{desc => <<"From ClientId">>})},
|
||||
{from_username, mk(binary(), #{desc => <<"From Username">>})}
|
||||
{msgid, mk(integer(), #{desc => <<"Message Id (MQTT message id hash)">>})},
|
||||
{node, mk(binary(), #{desc => <<"The node where message from">>})},
|
||||
{publish_at, mk(binary(), #{desc => <<"Client publish message time, rfc 3339">>})},
|
||||
{delayed_interval, mk(integer(), #{desc => <<"Delayed interval, second">>})},
|
||||
{delayed_remaining, mk(integer(), #{desc => <<"Delayed remaining, second">>})},
|
||||
{expected_at, mk(binary(), #{desc => <<"Expect publish time, rfc 3339">>})},
|
||||
{topic, mk(binary(), #{desc => <<"Topic">>, example => <<"/sys/#">>})},
|
||||
{qos, mk(binary(), #{desc => <<"QoS">>})},
|
||||
{from_clientid, mk(binary(), #{desc => <<"From ClientId">>})},
|
||||
{from_username, mk(binary(), #{desc => <<"From Username">>})}
|
||||
];
|
||||
fields("message") ->
|
||||
PayloadDesc = io_lib:format(
|
||||
|
@ -162,31 +170,37 @@ status(put, #{body := Body}) ->
|
|||
delayed_messages(get, #{query_string := Qs}) ->
|
||||
{200, emqx_delayed:cluster_list(Qs)}.
|
||||
|
||||
delayed_message(get, #{bindings := #{msgid := Id}}) ->
|
||||
case emqx_delayed:get_delayed_message(Id) of
|
||||
{ok, Message} ->
|
||||
Payload = maps:get(payload, Message),
|
||||
case erlang:byte_size(Payload) > ?MAX_PAYLOAD_SIZE of
|
||||
true ->
|
||||
{200, Message};
|
||||
_ ->
|
||||
{200, Message#{payload => base64:encode(Payload)}}
|
||||
end;
|
||||
{error, id_schema_error} ->
|
||||
{400, generate_http_code_map(id_schema_error, Id)};
|
||||
{error, not_found} ->
|
||||
{404, generate_http_code_map(not_found, Id)}
|
||||
end;
|
||||
delayed_message(delete, #{bindings := #{msgid := Id}}) ->
|
||||
case emqx_delayed:get_delayed_message(Id) of
|
||||
{ok, _Message} ->
|
||||
_ = emqx_delayed:delete_delayed_message(Id),
|
||||
{204};
|
||||
{error, id_schema_error} ->
|
||||
{400, generate_http_code_map(id_schema_error, Id)};
|
||||
{error, not_found} ->
|
||||
{404, generate_http_code_map(not_found, Id)}
|
||||
end.
|
||||
delayed_message(get, #{bindings := #{node := NodeBin, msgid := HexId}}) ->
|
||||
MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_atom/1),
|
||||
MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1),
|
||||
with_maybe([MaybeNode, MaybeId],
|
||||
fun(Node, Id) ->
|
||||
case emqx_delayed:get_delayed_message(Node, Id) of
|
||||
{ok, Message} ->
|
||||
Payload = maps:get(payload, Message),
|
||||
case erlang:byte_size(Payload) > ?MAX_PAYLOAD_SIZE of
|
||||
true ->
|
||||
{200, Message};
|
||||
_ ->
|
||||
{200, Message#{payload => base64:encode(Payload)}}
|
||||
end;
|
||||
{error, not_found} ->
|
||||
{404, generate_http_code_map(not_found, Id)}
|
||||
end
|
||||
end);
|
||||
|
||||
delayed_message(delete, #{bindings := #{node := NodeBin, msgid := HexId}}) ->
|
||||
MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_atom/1),
|
||||
MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1),
|
||||
with_maybe([MaybeNode, MaybeId],
|
||||
fun(Node, Id) ->
|
||||
case emqx_delayed:delete_delayed_message(Node, Id) of
|
||||
ok ->
|
||||
{204};
|
||||
{error, not_found} ->
|
||||
{404, generate_http_code_map(not_found, Id)}
|
||||
end
|
||||
end).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% internal function
|
||||
|
@ -236,4 +250,25 @@ generate_http_code_map(id_schema_error, Id) ->
|
|||
iolist_to_binary(io_lib:format("Message ID ~p schema error", [Id]))};
|
||||
generate_http_code_map(not_found, Id) ->
|
||||
#{code => ?MESSAGE_ID_NOT_FOUND, message =>
|
||||
iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}.
|
||||
iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))};
|
||||
generate_http_code_map(invalid_node, Node) ->
|
||||
#{code => ?INVALID_NODE, message =>
|
||||
iolist_to_binary(io_lib:format("The node name ~p is invalid", [Node]))}.
|
||||
|
||||
make_maybe(X, Error, Fun) ->
|
||||
try Fun(X) of
|
||||
Right ->
|
||||
Right
|
||||
catch _:_ ->
|
||||
{left, X, Error}
|
||||
end.
|
||||
|
||||
with_maybe(Maybes, Cont) ->
|
||||
with_maybe(Maybes, Cont, []).
|
||||
|
||||
with_maybe([], Cont, Rights) ->
|
||||
erlang:apply(Cont, lists:reverse(Rights));
|
||||
with_maybe([{left, X, Error} | _], _Cont, _Rights) ->
|
||||
{400, generate_http_code_map(Error, X)};
|
||||
with_maybe([Right | T], Cont, Rights) ->
|
||||
with_maybe(T, Cont, [Right | Rights]).
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%%Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_delayed_proto_v1).
|
||||
|
||||
-behaviour(emqx_bpapi).
|
||||
|
||||
-export([ introduced_in/0
|
||||
, get_delayed_message/2
|
||||
, delete_delayed_message/2
|
||||
]).
|
||||
|
||||
-include_lib("emqx/include/bpapi.hrl").
|
||||
|
||||
introduced_in() ->
|
||||
"5.0.0".
|
||||
|
||||
-spec get_delayed_message(node(), binary()) -> emqx_delayed:with_id_return(map()) | emqx_rpc:badrpc().
|
||||
get_delayed_message(Node, HexId) ->
|
||||
rpc:call(Node, emqx_delayed, get_delayed_message, [HexId]).
|
||||
|
||||
-spec delete_delayed_message(node(), binary()) -> emqx_delayed:with_id_return() | emqx_rpc:badrpc().
|
||||
delete_delayed_message(Node, HexId) ->
|
||||
rpc:call(Node, emqx_delayed, delete_delayed_message, [HexId]).
|
Loading…
Reference in New Issue