diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 4b54097fe..c31650b07 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -20,3 +20,4 @@ {emqx_statsd,1}. {emqx_telemetry,1}. {emqx_topic_metrics,1}. +{emqx_delayed,1}. diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 317942122..4fee7a000 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -47,7 +47,9 @@ , update_config/1 , list/1 , get_delayed_message/1 + , get_delayed_message/2 , delete_delayed_message/1 + , delete_delayed_message/2 , post_config_update/5 , cluster_list/1 , cluster_query/4 @@ -57,6 +59,10 @@ -record(delayed_message, {key, delayed, msg}). -type delayed_message() :: #delayed_message{}. +-type with_id_return() :: ok | {error, not_found}. +-type with_id_return(T) :: {ok, T} | {error, not_found}. + +-export_type([with_id_return/0, with_id_return/1]). -type state() :: #{ publish_timer := maybe(timer:tref()) @@ -164,6 +170,7 @@ format_delayed(#delayed_message{key = {ExpectTimeStamp, Id}, delayed = Delayed, RemainingTime = ExpectTimeStamp - erlang:system_time(second), Result = #{ msgid => emqx_guid:to_hexstr(Id), + node => node(), publish_at => PublishTime, delayed_interval => Delayed, delayed_remaining => RemainingTime, @@ -183,22 +190,24 @@ format_delayed(#delayed_message{key = {ExpectTimeStamp, Id}, delayed = Delayed, to_rfc3339(Timestamp) -> list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, second}])). -get_delayed_message(Id0) -> - try emqx_guid:from_hexstr(Id0) of - Id -> - case ets:select(?TAB, ?QUERY_MS(Id)) of - [] -> - {error, not_found}; - Rows -> - Message = hd(Rows), - {ok, format_delayed(Message, true)} - end - catch - error:function_clause -> {error, id_schema_error} +-spec get_delayed_message(binary()) -> with_id_return(map()). +get_delayed_message(Id) -> + case ets:select(?TAB, ?QUERY_MS(Id)) of + [] -> + {error, not_found}; + Rows -> + Message = hd(Rows), + {ok, format_delayed(Message, true)} end. -delete_delayed_message(Id0) -> - Id = emqx_guid:from_hexstr(Id0), +get_delayed_message(Node, Id) when Node =:= node() -> + get_delayed_message(Id); + +get_delayed_message(Node, Id) -> + emqx_delayed_proto_v1:get_delayed_message(Node, Id). + +-spec delete_delayed_message(binary()) -> with_id_return(). +delete_delayed_message(Id) -> case ets:select(?TAB, ?DELETE_MS(Id)) of [] -> {error, not_found}; @@ -206,6 +215,12 @@ delete_delayed_message(Id0) -> Timestamp = hd(Rows), mria:dirty_delete(?TAB, {Timestamp, Id}) end. + +delete_delayed_message(Node, Id) when Node =:= node() -> + delete_delayed_message(Id); +delete_delayed_message(Node, Id) -> + emqx_delayed_proto_v1:delete_delayed_message(Node, Id). + update_config(Config) -> emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}). diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index b6756c695..061bdc472 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -40,14 +40,12 @@ -export([api_spec/0]). --define(ALREADY_ENABLED, 'ALREADY_ENABLED'). --define(ALREADY_DISABLED, 'ALREADY_DISABLED'). - -define(INTERNAL_ERROR, 'INTERNAL_ERROR'). -define(BAD_REQUEST, 'BAD_REQUEST'). -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND'). -define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR'). +-define(INVALID_NODE, 'INVALID_NODE'). -define(MAX_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024 api_spec() -> @@ -56,7 +54,7 @@ api_spec() -> paths() -> [ "/mqtt/delayed" , "/mqtt/delayed/messages" - , "/mqtt/delayed/messages/:msgid" + , "/mqtt/delayed/messages/:node/:msgid" ]. schema("/mqtt/delayed") -> @@ -83,12 +81,16 @@ schema("/mqtt/delayed") -> } }; -schema("/mqtt/delayed/messages/:msgid") -> +schema("/mqtt/delayed/messages/:node/:msgid") -> #{'operationId' => delayed_message, get => #{ tags => ?API_TAG_MQTT, description => <<"Get delayed message">>, - parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}], + parameters => [ {node, + mk(binary(), + #{in => path, desc => <<"The node where message from">>})} + , {msgid, mk(binary(), #{in => path, desc => <<"Delay message ID">>})} + ], responses => #{ 200 => ref("message_without_payload"), 400 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_SCHEMA_ERROR] @@ -100,7 +102,12 @@ schema("/mqtt/delayed/messages/:msgid") -> delete => #{ tags => ?API_TAG_MQTT, description => <<"Delete delayed message">>, - parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}], + parameters => [ {node, + mk(binary(), + #{in => path, desc => <<"The node where message from">>})} + , {msgid, + mk(binary(), #{in => path, desc => <<"Delay message ID">>})} + ], responses => #{ 204 => <<"Delete delayed message success">>, 400 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_SCHEMA_ERROR] @@ -133,15 +140,16 @@ schema("/mqtt/delayed/messages") -> fields("message_without_payload") -> [ - {msgid, mk(integer(), #{desc => <<"Message Id (MQTT message id hash)">>})}, - {publish_at, mk(binary(), #{desc => <<"Client publish message time, rfc 3339">>})}, - {delayed_interval, mk(integer(), #{desc => <<"Delayed interval, second">>})}, - {delayed_remaining, mk(integer(), #{desc => <<"Delayed remaining, second">>})}, - {expected_at, mk(binary(), #{desc => <<"Expect publish time, rfc 3339">>})}, - {topic, mk(binary(), #{desc => <<"Topic">>, example => <<"/sys/#">>})}, - {qos, mk(binary(), #{desc => <<"QoS">>})}, - {from_clientid, mk(binary(), #{desc => <<"From ClientId">>})}, - {from_username, mk(binary(), #{desc => <<"From Username">>})} + {msgid, mk(integer(), #{desc => <<"Message Id (MQTT message id hash)">>})}, + {node, mk(binary(), #{desc => <<"The node where message from">>})}, + {publish_at, mk(binary(), #{desc => <<"Client publish message time, rfc 3339">>})}, + {delayed_interval, mk(integer(), #{desc => <<"Delayed interval, second">>})}, + {delayed_remaining, mk(integer(), #{desc => <<"Delayed remaining, second">>})}, + {expected_at, mk(binary(), #{desc => <<"Expect publish time, rfc 3339">>})}, + {topic, mk(binary(), #{desc => <<"Topic">>, example => <<"/sys/#">>})}, + {qos, mk(binary(), #{desc => <<"QoS">>})}, + {from_clientid, mk(binary(), #{desc => <<"From ClientId">>})}, + {from_username, mk(binary(), #{desc => <<"From Username">>})} ]; fields("message") -> PayloadDesc = io_lib:format( @@ -162,31 +170,37 @@ status(put, #{body := Body}) -> delayed_messages(get, #{query_string := Qs}) -> {200, emqx_delayed:cluster_list(Qs)}. -delayed_message(get, #{bindings := #{msgid := Id}}) -> - case emqx_delayed:get_delayed_message(Id) of - {ok, Message} -> - Payload = maps:get(payload, Message), - case erlang:byte_size(Payload) > ?MAX_PAYLOAD_SIZE of - true -> - {200, Message}; - _ -> - {200, Message#{payload => base64:encode(Payload)}} - end; - {error, id_schema_error} -> - {400, generate_http_code_map(id_schema_error, Id)}; - {error, not_found} -> - {404, generate_http_code_map(not_found, Id)} - end; -delayed_message(delete, #{bindings := #{msgid := Id}}) -> - case emqx_delayed:get_delayed_message(Id) of - {ok, _Message} -> - _ = emqx_delayed:delete_delayed_message(Id), - {204}; - {error, id_schema_error} -> - {400, generate_http_code_map(id_schema_error, Id)}; - {error, not_found} -> - {404, generate_http_code_map(not_found, Id)} - end. +delayed_message(get, #{bindings := #{node := NodeBin, msgid := HexId}}) -> + MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_atom/1), + MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1), + with_maybe([MaybeNode, MaybeId], + fun(Node, Id) -> + case emqx_delayed:get_delayed_message(Node, Id) of + {ok, Message} -> + Payload = maps:get(payload, Message), + case erlang:byte_size(Payload) > ?MAX_PAYLOAD_SIZE of + true -> + {200, Message}; + _ -> + {200, Message#{payload => base64:encode(Payload)}} + end; + {error, not_found} -> + {404, generate_http_code_map(not_found, Id)} + end + end); + +delayed_message(delete, #{bindings := #{node := NodeBin, msgid := HexId}}) -> + MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_atom/1), + MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1), + with_maybe([MaybeNode, MaybeId], + fun(Node, Id) -> + case emqx_delayed:delete_delayed_message(Node, Id) of + ok -> + {204}; + {error, not_found} -> + {404, generate_http_code_map(not_found, Id)} + end + end). %%-------------------------------------------------------------------- %% internal function @@ -236,4 +250,25 @@ generate_http_code_map(id_schema_error, Id) -> iolist_to_binary(io_lib:format("Message ID ~p schema error", [Id]))}; generate_http_code_map(not_found, Id) -> #{code => ?MESSAGE_ID_NOT_FOUND, message => - iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}. + iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}; +generate_http_code_map(invalid_node, Node) -> + #{code => ?INVALID_NODE, message => + iolist_to_binary(io_lib:format("The node name ~p is invalid", [Node]))}. + +make_maybe(X, Error, Fun) -> + try Fun(X) of + Right -> + Right + catch _:_ -> + {left, X, Error} + end. + +with_maybe(Maybes, Cont) -> + with_maybe(Maybes, Cont, []). + +with_maybe([], Cont, Rights) -> + erlang:apply(Cont, lists:reverse(Rights)); +with_maybe([{left, X, Error} | _], _Cont, _Rights) -> + {400, generate_http_code_map(Error, X)}; +with_maybe([Right | T], Cont, Rights) -> + with_maybe(T, Cont, [Right | Rights]). diff --git a/apps/emqx_modules/src/proto/emqx_delayed_proto_v1.erl b/apps/emqx_modules/src/proto/emqx_delayed_proto_v1.erl new file mode 100644 index 000000000..4c4a55dc4 --- /dev/null +++ b/apps/emqx_modules/src/proto/emqx_delayed_proto_v1.erl @@ -0,0 +1,37 @@ +%%-------------------------------------------------------------------- +%%Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_delayed_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + , get_delayed_message/2 + , delete_delayed_message/2 + ]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.0". + +-spec get_delayed_message(node(), binary()) -> emqx_delayed:with_id_return(map()) | emqx_rpc:badrpc(). +get_delayed_message(Node, HexId) -> + rpc:call(Node, emqx_delayed, get_delayed_message, [HexId]). + +-spec delete_delayed_message(node(), binary()) -> emqx_delayed:with_id_return() | emqx_rpc:badrpc(). +delete_delayed_message(Node, HexId) -> + rpc:call(Node, emqx_delayed, delete_delayed_message, [HexId]).