diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index 8137d9e63..9199d7b2c 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -44,6 +44,7 @@ -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND'). -define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR'). +-define(MAX_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024 api_spec() -> emqx_dashboard_swagger:spec(?MODULE). @@ -157,11 +158,11 @@ delayed_message(get, #{bindings := #{msgid := Id}}) -> case emqx_delayed:get_delayed_message(Id) of {ok, Message} -> Payload = maps:get(payload, Message), - case size(Payload) > ?MAX_PAYLOAD_LENGTH of + case erlang:byte_size(Payload) > ?MAX_PAYLOAD_SIZE of true -> - {200, Message#{payload => ?PAYLOAD_TOO_LARGE}}; + {200, Message}; _ -> - {200, Message#{payload => Payload}} + {200, Message#{payload => base64:encode(Payload)}} end; {error, id_schema_error} -> {400, generate_http_code_map(id_schema_error, Id)}; diff --git a/apps/emqx_retainer/src/emqx_retainer_api.erl b/apps/emqx_retainer/src/emqx_retainer_api.erl index 26d341b53..5424629d9 100644 --- a/apps/emqx_retainer/src/emqx_retainer_api.erl +++ b/apps/emqx_retainer/src/emqx_retainer_api.erl @@ -28,13 +28,14 @@ -import(emqx_mgmt_api_configs, [gen_schema/1]). -import(emqx_mgmt_util, [ object_array_schema/2 + , object_schema/2 , schema/1 , schema/2 , error_schema/2 , page_params/0 , properties/1]). --define(MAX_BASE64_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024 +-define(MAX_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024 api_spec() -> {[lookup_retained_api(), with_topic_api(), config_api()], []}. @@ -64,7 +65,7 @@ parameters() -> lookup_retained_api() -> Metadata = #{ get => #{ - description => <<"lookup matching messages">>, + description => <<"List retained messages">>, parameters => page_params(), responses => #{ <<"200">> => object_array_schema( @@ -80,9 +81,10 @@ with_topic_api() -> MetaData = #{ get => #{ description => <<"lookup matching messages">>, - parameters => parameters() ++ page_params(), + parameters => parameters(), responses => #{ - <<"200">> => object_array_schema(message_props(), <<"List retained messages">>), + <<"200">> => object_schema(message_props(), <<"List retained messages">>), + <<"404">> => error_schema(<<"Reatined Not Exists">>, ['NOT_FOUND']), <<"405">> => schema(<<"NotAllowed">>) } }, @@ -139,35 +141,27 @@ config(put, #{body := Body}) -> %%------------------------------------------------------------------------------ %% Interval Funcs %%------------------------------------------------------------------------------ -lookup_retained(get, Params) -> - lookup(undefined, Params, fun format_message/1). +lookup_retained(get, #{query_string := Qs}) -> + Page = maps:get(page, Qs, 1), + Limit = maps:get(page, Qs, emqx_mgmt:max_row_limit()), + {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, undefined, Page, Limit), + {200, [format_message(Msg) || Msg <- Msgs]}. -with_topic(get, #{bindings := Bindings} = Params) -> +with_topic(get, #{bindings := Bindings}) -> Topic = maps:get(topic, Bindings), - lookup(Topic, Params, fun format_detail_message/1); + {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1), + case Msgs of + [H | _] -> + {200, format_detail_message(H)}; + _ -> + {404, #{code => 'NOT_FOUND'}} + end; with_topic(delete, #{bindings := Bindings}) -> Topic = maps:get(topic, Bindings), emqx_retainer_mnesia:delete_message(undefined, Topic), {204}. --spec lookup(undefined | binary(), - map(), - fun((emqx_types:message()) -> map())) -> - {200, map()}. -lookup(Topic, #{query_string := Qs}, Formatter) -> - Page = maps:get(page, Qs, 1), - Limit = maps:get(page, Qs, emqx_mgmt:max_row_limit()), - {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit), - {200, format_message(Msgs, Formatter)}. - - -format_message(Messages, Formatter) when is_list(Messages)-> - [Formatter(Message) || Message <- Messages]; - -format_message(Message, Formatter) -> - Formatter(Message). - format_message(#message{ id = ID, qos = Qos, topic = Topic, from = From , timestamp = Timestamp, headers = Headers}) -> #{msgid => emqx_guid:to_hexstr(ID), @@ -181,12 +175,11 @@ format_message(#message{ id = ID, qos = Qos, topic = Topic, from = From format_detail_message(#message{payload = Payload} = Msg) -> Base = format_message(Msg), - EncodePayload = base64:encode(Payload), - case erlang:byte_size(EncodePayload) =< ?MAX_BASE64_PAYLOAD_SIZE of + case erlang:byte_size(Payload) =< ?MAX_PAYLOAD_SIZE of true -> - Base#{payload => EncodePayload}; + Base#{payload => base64:encode(Payload)}; _ -> - Base#{payload => base64:encode(<<"PAYLOAD_TOO_LARGE">>)} + Base end. to_bin_string(Data) when is_binary(Data) ->