From cc1b8eca6c7069301eeb2e9323c6537db6a07a3f Mon Sep 17 00:00:00 2001 From: DDDHuang <904897578@qq.com> Date: Thu, 19 Aug 2021 12:12:14 +0800 Subject: [PATCH] fix: params & config generate; api base64 encode payload --- apps/emqx_modules/src/emqx_delayed.erl | 12 +-- apps/emqx_modules/src/emqx_delayed_api.erl | 104 ++++++++++++++------- 2 files changed, 75 insertions(+), 41 deletions(-) diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index da888e547..c982198bb 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -44,7 +44,7 @@ -export([ enable/0 , disable/0 , set_max_delayed_messages/1 - , update_config/2 + , update_config/1 , list/1 , get_delayed_message/1 , delete_delayed_message/1 @@ -146,7 +146,7 @@ format_delayed(#delayed_message{key = {TimeStamp, Id}, }, case WithPayload of true -> - Result#{payload => Payload}; + Result#{payload => base64:encode(Payload)}; _ -> Result end. @@ -172,12 +172,8 @@ delete_delayed_message(Id0) -> Timestamp = hd(Rows), ekka_mnesia:dirty_delete(?TAB, {Timestamp, Id}) end. - -update_config(Enable, MaxDelayedMessages) -> - Opts0 = emqx_config:get_raw([<<"delayed">>], #{}), - Opts1 = maps:put(<<"enable">>, Enable, Opts0), - Opts = maps:put(<<"max_delayed_messages">>, MaxDelayedMessages, Opts1), - {ok, _} = emqx:update_config([delayed], Opts). +update_config(Config) -> + {ok, _} = emqx:update_config([delayed], Config). %%-------------------------------------------------------------------- %% gen_server callback diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index 85226701a..2d6c3ddf0 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -25,13 +25,16 @@ , request_body_schema/1 ]). +-define(MAX_PAYLOAD_LENGTH, 2048). +-define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE'). + -export([ status/2 , delayed_messages/2 , delayed_message/2 ]). %% for rpc --export([update_config_/2]). +-export([update_config_/1]). -export([api_spec/0]). @@ -40,7 +43,7 @@ -define(BAD_REQUEST, 'BAD_REQUEST'). --define(MESSAGE_ID_NOT_FOUND, 'ALREADY_DISABLED'). +-define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND'). api_spec() -> { @@ -66,6 +69,9 @@ delayed_schema(WithPayload) -> end. delayed_message_properties() -> + PayloadDesc = list_to_binary( + io_lib:format("Payload, base64 encode. Payload will be ~p if length large than ~p", + [?PAYLOAD_TOO_LARGE, ?MAX_PAYLOAD_LENGTH])), #{ id => #{ type => integer, @@ -82,7 +88,7 @@ delayed_message_properties() -> description => <<"Qos">>}, payload => #{ type => string, - description => <<"Payload">>}, + description => PayloadDesc}, form_clientid => #{ type => string, description => <<"Client ID">>}, @@ -110,7 +116,7 @@ status_api() -> 'requestBody' => request_body_schema(Schema), responses => #{ <<"200">> => - response_schema(<<"Enable or disable delayed successfully">>), + response_schema(<<"Enable or disable delayed successfully">>, Schema), <<"400">> => response_error_schema(<<"Already disabled or enabled">>, [?ALREADY_ENABLED, ?ALREADY_DISABLED])}}}, {"/mqtt/delayed_messages/status", Metadata, status}. @@ -156,10 +162,8 @@ status(get, _Request) -> status(put, Request) -> {ok, Body, _} = cowboy_req:read_body(Request), - Params = emqx_json:decode(Body, [return_maps]), - Enable = maps:get(<<"enable">>, Params), - MaxDelayedMessages = maps:get(<<"max_delayed_messages">>, Params), - update_config(Enable, MaxDelayedMessages). + Config = emqx_json:decode(Body, [return_maps]), + update_config(Config). delayed_messages(get, Request) -> Qs = cowboy_req:parse_qs(Request), @@ -169,7 +173,13 @@ delayed_message(get, Request) -> Id = cowboy_req:binding(id, Request), case emqx_delayed:get_delayed_message(Id) of {ok, Message} -> - {200, Message}; + Payload = maps:get(payload, Message), + case size(Payload) > ?MAX_PAYLOAD_LENGTH of + true -> + {200, Message#{payload => ?PAYLOAD_TOO_LARGE}}; + _ -> + {200, Message#{payload => base64:encode(Payload)}} + end; {error, not_found} -> Message = list_to_binary(io_lib:format("Message ID ~p not found", [Id])), {404, #{code => ?MESSAGE_ID_NOT_FOUND, message => Message}} @@ -183,44 +193,72 @@ delayed_message(delete, Request) -> %% internal function %%-------------------------------------------------------------------- get_status() -> - #{ - enable => emqx:get_config([delayed, enable], true), - max_delayed_messages => emqx:get_config([delayed, max_delayed_messages], 0) - }. + emqx:get_config([delayed], #{}). -update_config(Enable, MaxDelayedMessages) when MaxDelayedMessages >= 0 -> - case Enable =:= maps:get(enable, get_status()) of - true -> - update_config_error_response(Enable); +update_config(Config) -> + case generate_config(Config) of + {ok, Config} -> + update_config_(Config), + {200, get_status()}; + {error, {Code, Message}} -> + {400, #{code => Code, message => Message}} + end. +generate_config(Config) -> + generate_config(Config, [fun generate_enable/1, fun generate_max_delayed_messages/1]). + +generate_config(Config, []) -> + {ok, Config}; +generate_config(Config, [Fun | Tail]) -> + case Fun(Config) of + {ok, Config} -> + generate_config(Config, Tail); + {error, CodeMessage} -> + {error, CodeMessage} + end. + +generate_enable(Config = #{<<"enable">> := Enable}) -> + case {Enable =:= maps:get(enable, get_status()), Enable} of + {true, true} -> + {error, {?ALREADY_ENABLED, <<"Delayed message status is already enabled">>}}; + {true, false} -> + {error, {?ALREADY_DISABLED, <<"Delayed message status is already disable">>}}; _ -> - update_config_(Enable, MaxDelayedMessages), - {200} + {ok, Config} end; -update_config(_Enable, _MaxDelayedMessages) -> - {400, #{code => ?BAD_REQUEST, message => <<"Max delayed must be equal or greater than 0">>}}. +generate_enable(Config) -> + {ok, Config}. -update_config_error_response(true) -> - {400, #{code => ?ALREADY_ENABLED, message => <<"Delayed message status is already enabled">>}}; -update_config_error_response(false) -> - {400, #{code => ?ALREADY_DISABLED, message => <<"Delayed message status is already disable">>}}. +generate_max_delayed_messages(Config = #{<<"max_delayed_messages">> := Max}) when Max >= 0 -> + {ok, Config}; +generate_max_delayed_messages(#{<<"max_delayed_messages">> := Max}) when Max < 0 -> + {error, {?BAD_REQUEST, <<"Max delayed must be equal or greater than 0">>}}; +generate_max_delayed_messages(Config) -> + {ok, Config}. -update_config_(Enable, MaxDelayedMessages) -> +update_config_(Config) -> lists:foreach(fun(Node) -> - update_config_(Node, Enable, MaxDelayedMessages) + update_config_(Node, Config) end, ekka_mnesia:running_nodes()). -update_config_(Node, Enable, MaxDelayedMessages) when Node =:= node() -> - _ = emqx_delayed:update_config(Enable, MaxDelayedMessages), - ok = emqx_delayed:set_max_delayed_messages(MaxDelayedMessages), - case Enable of +update_config_(Node, Config) when Node =:= node() -> + _ = emqx_delayed:update_config(Config), + case maps:get(<<"enable">>, Config, undefined) of + undefined -> + ignore; true -> emqx_delayed:enable(); false -> emqx_delayed:disable() + end, + case maps:get(<<"max_delayed_messages">>, Config, undefined) of + undefined -> + ignore; + Max -> + ok = emqx_delayed:set_max_delayed_messages(Max) end; -update_config_(Node, Enable, MaxDelayedMessages) -> - rpc_call(Node, ?MODULE, update_config_, [Node, Enable, MaxDelayedMessages]). +update_config_(Node, Config) -> + rpc_call(Node, ?MODULE, ?FUNCTION_NAME, [Node, Config]). rpc_call(Node, Module, Fun, Args) -> case rpc:call(Node, Module, Fun, Args) of