From 5520326ce3435fa8b48a99ae1ce213e8f7168df9 Mon Sep 17 00:00:00 2001 From: DDDHuang <904897578@qq.com> Date: Tue, 17 Aug 2021 13:46:03 +0800 Subject: [PATCH] feat: add delayed api --- apps/emqx_management/src/emqx_mgmt_util.erl | 4 +- apps/emqx_modules/etc/emqx_modules.conf | 1 + apps/emqx_modules/src/emqx_delayed.erl | 67 +++++- apps/emqx_modules/src/emqx_delayed_api.erl | 229 ++++++++++++-------- apps/emqx_modules/src/emqx_rewrite_api.erl | 17 +- 5 files changed, 224 insertions(+), 94 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_util.erl b/apps/emqx_management/src/emqx_mgmt_util.erl index 7d5f85cf2..d764afb07 100644 --- a/apps/emqx_management/src/emqx_mgmt_util.erl +++ b/apps/emqx_management/src/emqx_mgmt_util.erl @@ -140,6 +140,8 @@ response_error_schema(Description, Enum) -> response_page_schema(Def) when is_atom(Def) -> response_page_schema(atom_to_binary(Def, utf8)); response_page_schema(Def) when is_binary(Def) -> + response_page_schema(minirest:ref(Def)); +response_page_schema(ItemSchema) when is_map(ItemSchema) -> Schema = #{ type => object, properties => #{ @@ -154,7 +156,7 @@ response_page_schema(Def) when is_binary(Def) -> type => integer}}}, data => #{ type => array, - items => minirest:ref(Def)}}}, + items => ItemSchema}}}, json_content_schema("", Schema). response_batch_schema(DefName) when is_atom(DefName) -> diff --git a/apps/emqx_modules/etc/emqx_modules.conf b/apps/emqx_modules/etc/emqx_modules.conf index 92f563342..0200d16c5 100644 --- a/apps/emqx_modules/etc/emqx_modules.conf +++ b/apps/emqx_modules/etc/emqx_modules.conf @@ -1,5 +1,6 @@ delayed: { enable: true + ## 0 is no limit max_delayed_messages: 0 } diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 5e1754f4b..da888e547 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -21,7 +21,6 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). - %% Mnesia bootstrap -export([mnesia/1]). @@ -44,6 +43,11 @@ %% gen_server callbacks -export([ enable/0 , disable/0 + , set_max_delayed_messages/1 + , update_config/2 + , list/1 + , get_delayed_message/1 + , delete_delayed_message/1 ]). -record(delayed_message, {key, msg}). @@ -117,6 +121,64 @@ enable() -> disable() -> gen_server:call(?SERVER, disable). +set_max_delayed_messages(Max) -> + gen_server:call(?SERVER, {set_max_delayed_messages, Max}). + +list(Params) -> + emqx_mgmt_api:paginate(?TAB, Params, fun format_delayed/1). + +format_delayed(Delayed) -> + format_delayed(Delayed, false). + +format_delayed(#delayed_message{key = {TimeStamp, Id}, + msg = #message{topic = Topic, + from = From, + headers = #{username := Username}, + qos = Qos, + payload = Payload}}, WithPayload) -> + Result = #{ + id => emqx_guid:to_hexstr(Id), + publish_time => list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, second}])), + topic => Topic, + qos => Qos, + from_clientid => From, + from_username => Username + }, + case WithPayload of + true -> + Result#{payload => Payload}; + _ -> + Result + end. + +get_delayed_message(Id0) -> + Id = emqx_guid:from_hexstr(Id0), + Ms = [{{delayed_message,{'_',Id},'_'},[],['$_']}], + case ets:select(?TAB, Ms) of + [] -> + {error, not_found}; + Rows -> + Message = hd(Rows), + {ok, format_delayed(Message, true)} + end. + +delete_delayed_message(Id0) -> + Id = emqx_guid:from_hexstr(Id0), + Ms = [{{delayed_message, {'$1', Id}, '_'}, [], ['$1']}], + case ets:select(?TAB, Ms) of + [] -> + {error, not_found}; + Rows -> + Timestamp = hd(Rows), + ekka_mnesia:dirty_delete(?TAB, {Timestamp, Id}) + end. + +update_config(Enable, MaxDelayedMessages) -> + Opts0 = emqx_config:get_raw([<<"delayed">>], #{}), + Opts1 = maps:put(<<"enable">>, Enable, Opts0), + Opts = maps:put(<<"max_delayed_messages">>, MaxDelayedMessages, Opts1), + {ok, _} = emqx:update_config([delayed], Opts). + %%-------------------------------------------------------------------- %% gen_server callback %%-------------------------------------------------------------------- @@ -128,6 +190,9 @@ init([Opts]) -> publish_at => 0, max_delayed_messages => MaxDelayedMessages}))}. +handle_call({set_max_delayed_messages, Max}, _From, State) -> + {reply, ok, State#{max_delayed_messages => Max}}; + handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State = #{max_delayed_messages := 0}) -> ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg), diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index 06de3ab13..85226701a 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -20,110 +20,133 @@ -import(emqx_mgmt_util, [ response_schema/1 , response_schema/2 + , response_error_schema/2 + , response_page_schema/1 , request_body_schema/1 ]). -% -export([cli/1]). - -export([ status/2 , delayed_messages/2 - , delete_delayed_message/2 + , delayed_message/2 ]). --export([enable_delayed/2]). +%% for rpc +-export([update_config_/2]). -export([api_spec/0]). +-define(ALREADY_ENABLED, 'ALREADY_ENABLED'). +-define(ALREADY_DISABLED, 'ALREADY_DISABLED'). + +-define(BAD_REQUEST, 'BAD_REQUEST'). + +-define(MESSAGE_ID_NOT_FOUND, 'ALREADY_DISABLED'). + api_spec() -> - {[status(), delayed_messages(), delete_delayed_message()], - [delayed_message_schema()]}. + { + [status_api(), delayed_messages_api(), delayed_message_api()], + [] + }. +delayed_schema() -> + delayed_schema(false). -delayed_message_schema() -> - #{broker_info => #{ +delayed_schema(WithPayload) -> + case WithPayload of + true -> + #{ + type => object, + properties => delayed_message_properties() + }; + _ -> + #{ + type => object, + properties => maps:without([payload], delayed_message_properties()) + } + end. + +delayed_message_properties() -> + #{ + id => #{ + type => integer, + description => <<"Message Id (MQTT message id hash)">>}, + publish_time => #{ + type => string, + description => <<"publish time, rfc 3339">>}, + topic => #{ + type => string, + description => <<"Topic">>}, + qos => #{ + type => integer, + enum => [0, 1, 2], + description => <<"Qos">>}, + payload => #{ + type => string, + description => <<"Payload">>}, + form_clientid => #{ + type => string, + description => <<"Client ID">>}, + form_username => #{ + type => string, + description => <<"Username">>} + }. + +status_api() -> + Schema = #{ type => object, properties => #{ - msgid => #{ - type => string, - description => <<"Message Id">> - } - } - }}. - -status() -> + enable => #{ + type => boolean}, + max_delayed_messages => #{ + type => integer, + description => <<"Max limit, 0 is no limit">>}}}, Metadata = #{ get => #{ description => "Get delayed status", responses => #{ - <<"200">> => response_schema(<<"Bad Request">>, - #{ - type => object, - properties => #{enable => #{type => boolean}} - } - ) - } - }, + <<"200">> => response_schema(<<"Bad Request">>, Schema)}}, put => #{ - description => "Enable or disbale delayed", - 'requestBody' => request_body_schema(#{ - type => object, - properties => #{ - enable => #{ - type => boolean - } - } - }), + description => "Enable or disable delayed, set max delayed messages", + 'requestBody' => request_body_schema(Schema), responses => #{ <<"200">> => - response_schema(<<"Enable or disbale delayed successfully">>), + response_schema(<<"Enable or disable delayed successfully">>), <<"400">> => - response_schema(<<"Bad Request">>, - #{ - type => object, - properties => #{ - message => #{type => string}, - code => #{type => string} - } - } - ) - } - } - }, - {"/delayed/status", Metadata, status}. + response_error_schema(<<"Already disabled or enabled">>, [?ALREADY_ENABLED, ?ALREADY_DISABLED])}}}, + {"/mqtt/delayed_messages/status", Metadata, status}. -delayed_messages() -> +delayed_messages_api() -> Metadata = #{ get => #{ - description => "Get delayed message list", + description => "List delayed messages", responses => #{ - <<"200">> => emqx_mgmt_util:response_array_schema(<<>>, delayed_message) - } - } - }, - {"/delayed/messages", Metadata, delayed_messages}. + <<"200">> => response_page_schema(delayed_schema())}}}, + {"/mqtt/delayed_messages", Metadata, delayed_messages}. -delete_delayed_message() -> +delayed_message_api() -> Metadata = #{ - delete => #{ - description => "Delete delayed message", + get => #{ + description => "Get delayed message", parameters => [#{ - name => msgid, + name => id, in => path, schema => #{type => string}, required => true }], responses => #{ - <<"200">> => response_schema(<<"Bad Request">>, - #{ - type => object, - properties => #{enable => #{type => boolean}} - } - ) - } - } - }, - {"/delayed/messages/:msgid", Metadata, delete_delayed_message}. - + <<"200">> => response_schema(<<"Get delayed message success">>, delayed_schema(true)), + <<"404">> => response_error_schema(<<"Message ID not found">>, [?MESSAGE_ID_NOT_FOUND])}}, + delete => #{ + description => "Delete delayed message", + parameters => [#{ + name => id, + in => path, + schema => #{type => string}, + required => true + }], + responses => #{ + <<"200">> => response_schema(<<"Delete delayed message success">>)}}}, + {"/mqtt/delayed_messages/:id", Metadata, delayed_message}. %%-------------------------------------------------------------------- %% HTTP API @@ -135,33 +158,60 @@ status(put, Request) -> {ok, Body, _} = cowboy_req:read_body(Request), Params = emqx_json:decode(Body, [return_maps]), Enable = maps:get(<<"enable">>, Params), - case Enable =:= get_status() of - true -> - Reason = case Enable of - true -> <<"Telemetry status is already enabled">>; - false -> <<"Telemetry status is already disable">> - end, - {400, #{code => "BAD_REQUEST", message => Reason}}; - false -> - enable_delayed(Enable), - {200} - end. + MaxDelayedMessages = maps:get(<<"max_delayed_messages">>, Params), + update_config(Enable, MaxDelayedMessages). -delayed_messages(get, _Request) -> - {200, []}. +delayed_messages(get, Request) -> + Qs = cowboy_req:parse_qs(Request), + {200, emqx_delayed:list(Qs)}. -delete_delayed_message(delete, _Request) -> +delayed_message(get, Request) -> + Id = cowboy_req:binding(id, Request), + case emqx_delayed:get_delayed_message(Id) of + {ok, Message} -> + {200, Message}; + {error, not_found} -> + Message = list_to_binary(io_lib:format("Message ID ~p not found", [Id])), + {404, #{code => ?MESSAGE_ID_NOT_FOUND, message => Message}} + end; +delayed_message(delete, Request) -> + Id = cowboy_req:binding(id, Request), + _ = emqx_delayed:delete_delayed_message(Id), {200}. %%-------------------------------------------------------------------- %% internal function %%-------------------------------------------------------------------- -enable_delayed(Enable) -> +get_status() -> + #{ + enable => emqx:get_config([delayed, enable], true), + max_delayed_messages => emqx:get_config([delayed, max_delayed_messages], 0) + }. + +update_config(Enable, MaxDelayedMessages) when MaxDelayedMessages >= 0 -> + case Enable =:= maps:get(enable, get_status()) of + true -> + update_config_error_response(Enable); + _ -> + update_config_(Enable, MaxDelayedMessages), + {200} + end; +update_config(_Enable, _MaxDelayedMessages) -> + {400, #{code => ?BAD_REQUEST, message => <<"Max delayed must be equal or greater than 0">>}}. + +update_config_error_response(true) -> + {400, #{code => ?ALREADY_ENABLED, message => <<"Delayed message status is already enabled">>}}; +update_config_error_response(false) -> + {400, #{code => ?ALREADY_DISABLED, message => <<"Delayed message status is already disable">>}}. + +update_config_(Enable, MaxDelayedMessages) -> lists:foreach(fun(Node) -> - enable_delayed(Node, Enable) + update_config_(Node, Enable, MaxDelayedMessages) end, ekka_mnesia:running_nodes()). -enable_delayed(Node, Enable) when Node =:= node() -> +update_config_(Node, Enable, MaxDelayedMessages) when Node =:= node() -> + _ = emqx_delayed:update_config(Enable, MaxDelayedMessages), + ok = emqx_delayed:set_max_delayed_messages(MaxDelayedMessages), case Enable of true -> emqx_delayed:enable(); @@ -169,14 +219,11 @@ enable_delayed(Node, Enable) when Node =:= node() -> emqx_delayed:disable() end; -enable_delayed(Node, Enable) -> - rpc_call(Node, ?MODULE, enable_delayed, [Node, Enable]). +update_config_(Node, Enable, MaxDelayedMessages) -> + rpc_call(Node, ?MODULE, update_config_, [Node, Enable, MaxDelayedMessages]). rpc_call(Node, Module, Fun, Args) -> case rpc:call(Node, Module, Fun, Args) of {badrpc, Reason} -> {error, Reason}; Result -> Result end. - -get_status() -> - emqx_config:get([delayed, enable], true). diff --git a/apps/emqx_modules/src/emqx_rewrite_api.erl b/apps/emqx_modules/src/emqx_rewrite_api.erl index 2be28a081..9b07b0a93 100644 --- a/apps/emqx_modules/src/emqx_rewrite_api.erl +++ b/apps/emqx_modules/src/emqx_rewrite_api.erl @@ -1,3 +1,18 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- -module(emqx_rewrite_api). -behaviour(minirest_api). @@ -44,7 +59,7 @@ rewrite_api() -> post => #{ description => <<"Update topic rewrite">>, 'requestBody' => emqx_mgmt_util:request_body_array_schema(topic_rewrite_schema()), - response => #{ + responses => #{ <<"200">> => emqx_mgmt_util:response_schema(<<"Update topic rewrite success">>, topic_rewrite_schema()), <<"413">> => emqx_mgmt_util:response_error_schema(<<"Rules count exceed max limit">>, [?EXCEED_LIMIT])}}},