fix(api): fix the payload in the result of the api of emqx_retainer & emqx_dealy

This commit is contained in:
lafirest 2021-12-24 14:52:10 +08:00
parent e30a4d8d8d
commit c68499e39a
2 changed files with 26 additions and 32 deletions

View File

@ -44,6 +44,7 @@
-define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND'). -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND').
-define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR'). -define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR').
-define(MAX_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024
api_spec() -> api_spec() ->
emqx_dashboard_swagger:spec(?MODULE). emqx_dashboard_swagger:spec(?MODULE).
@ -157,11 +158,11 @@ delayed_message(get, #{bindings := #{msgid := Id}}) ->
case emqx_delayed:get_delayed_message(Id) of case emqx_delayed:get_delayed_message(Id) of
{ok, Message} -> {ok, Message} ->
Payload = maps:get(payload, Message), Payload = maps:get(payload, Message),
case size(Payload) > ?MAX_PAYLOAD_LENGTH of case erlang:byte_size(Payload) > ?MAX_PAYLOAD_SIZE of
true -> true ->
{200, Message#{payload => ?PAYLOAD_TOO_LARGE}}; {200, Message};
_ -> _ ->
{200, Message#{payload => Payload}} {200, Message#{payload => base64:encode(Payload)}}
end; end;
{error, id_schema_error} -> {error, id_schema_error} ->
{400, generate_http_code_map(id_schema_error, Id)}; {400, generate_http_code_map(id_schema_error, Id)};

View File

@ -28,13 +28,14 @@
-import(emqx_mgmt_api_configs, [gen_schema/1]). -import(emqx_mgmt_api_configs, [gen_schema/1]).
-import(emqx_mgmt_util, [ object_array_schema/2 -import(emqx_mgmt_util, [ object_array_schema/2
, object_schema/2
, schema/1 , schema/1
, schema/2 , schema/2
, error_schema/2 , error_schema/2
, page_params/0 , page_params/0
, properties/1]). , properties/1]).
-define(MAX_BASE64_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024 -define(MAX_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024
api_spec() -> api_spec() ->
{[lookup_retained_api(), with_topic_api(), config_api()], []}. {[lookup_retained_api(), with_topic_api(), config_api()], []}.
@ -64,7 +65,7 @@ parameters() ->
lookup_retained_api() -> lookup_retained_api() ->
Metadata = #{ Metadata = #{
get => #{ get => #{
description => <<"lookup matching messages">>, description => <<"List retained messages">>,
parameters => page_params(), parameters => page_params(),
responses => #{ responses => #{
<<"200">> => object_array_schema( <<"200">> => object_array_schema(
@ -80,9 +81,10 @@ with_topic_api() ->
MetaData = #{ MetaData = #{
get => #{ get => #{
description => <<"lookup matching messages">>, description => <<"lookup matching messages">>,
parameters => parameters() ++ page_params(), parameters => parameters(),
responses => #{ responses => #{
<<"200">> => object_array_schema(message_props(), <<"List retained messages">>), <<"200">> => object_schema(message_props(), <<"List retained messages">>),
<<"404">> => error_schema(<<"Reatined Not Exists">>, ['NOT_FOUND']),
<<"405">> => schema(<<"NotAllowed">>) <<"405">> => schema(<<"NotAllowed">>)
} }
}, },
@ -139,35 +141,27 @@ config(put, #{body := Body}) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Interval Funcs %% Interval Funcs
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
lookup_retained(get, Params) -> lookup_retained(get, #{query_string := Qs}) ->
lookup(undefined, Params, fun format_message/1). Page = maps:get(page, Qs, 1),
Limit = maps:get(page, Qs, emqx_mgmt:max_row_limit()),
{ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, undefined, Page, Limit),
{200, [format_message(Msg) || Msg <- Msgs]}.
with_topic(get, #{bindings := Bindings} = Params) -> with_topic(get, #{bindings := Bindings}) ->
Topic = maps:get(topic, Bindings), Topic = maps:get(topic, Bindings),
lookup(Topic, Params, fun format_detail_message/1); {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1),
case Msgs of
[H | _] ->
{200, format_detail_message(H)};
_ ->
{404, #{code => 'NOT_FOUND'}}
end;
with_topic(delete, #{bindings := Bindings}) -> with_topic(delete, #{bindings := Bindings}) ->
Topic = maps:get(topic, Bindings), Topic = maps:get(topic, Bindings),
emqx_retainer_mnesia:delete_message(undefined, Topic), emqx_retainer_mnesia:delete_message(undefined, Topic),
{204}. {204}.
-spec lookup(undefined | binary(),
map(),
fun((emqx_types:message()) -> map())) ->
{200, map()}.
lookup(Topic, #{query_string := Qs}, Formatter) ->
Page = maps:get(page, Qs, 1),
Limit = maps:get(page, Qs, emqx_mgmt:max_row_limit()),
{ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit),
{200, format_message(Msgs, Formatter)}.
format_message(Messages, Formatter) when is_list(Messages)->
[Formatter(Message) || Message <- Messages];
format_message(Message, Formatter) ->
Formatter(Message).
format_message(#message{ id = ID, qos = Qos, topic = Topic, from = From format_message(#message{ id = ID, qos = Qos, topic = Topic, from = From
, timestamp = Timestamp, headers = Headers}) -> , timestamp = Timestamp, headers = Headers}) ->
#{msgid => emqx_guid:to_hexstr(ID), #{msgid => emqx_guid:to_hexstr(ID),
@ -181,12 +175,11 @@ format_message(#message{ id = ID, qos = Qos, topic = Topic, from = From
format_detail_message(#message{payload = Payload} = Msg) -> format_detail_message(#message{payload = Payload} = Msg) ->
Base = format_message(Msg), Base = format_message(Msg),
EncodePayload = base64:encode(Payload), case erlang:byte_size(Payload) =< ?MAX_PAYLOAD_SIZE of
case erlang:byte_size(EncodePayload) =< ?MAX_BASE64_PAYLOAD_SIZE of
true -> true ->
Base#{payload => EncodePayload}; Base#{payload => base64:encode(Payload)};
_ -> _ ->
Base#{payload => base64:encode(<<"PAYLOAD_TOO_LARGE">>)} Base
end. end.
to_bin_string(Data) when is_binary(Data) -> to_bin_string(Data) when is_binary(Data) ->