diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 21757b528..86ef97dac 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -164,13 +164,17 @@ to_rfc3339(Timestamp) -> list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, second}])). get_delayed_message(Id0) -> - Id = emqx_guid:from_hexstr(Id0), - case ets:select(?TAB, ?QUERY_MS(Id)) of - [] -> - {error, not_found}; - Rows -> - Message = hd(Rows), - {ok, format_delayed(Message, true)} + try emqx_guid:from_hexstr(Id0) of + Id -> + case ets:select(?TAB, ?QUERY_MS(Id)) of + [] -> + {error, not_found}; + Rows -> + Message = hd(Rows), + {ok, format_delayed(Message, true)} + end + catch + error:function_clause -> {error, id_schema_error} end. delete_delayed_message(Id0) -> diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index 96589bf06..5975dfdd4 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -46,6 +46,7 @@ -define(BAD_REQUEST, 'BAD_REQUEST'). -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND'). +-define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR'). api_spec() -> { @@ -117,14 +118,17 @@ delayed_message_api() -> description => <<"Get delayed message">>, parameters => parameters(), responses => #{ - <<"200">> => object_schema(maps:without([payload], properties()), <<"Get delayed message success">>), - <<"404">> => error_schema(<<"Message ID not found">>, [?MESSAGE_ID_NOT_FOUND]) + <<"400">> => error_schema(<<"Message ID Schema error">>, [?MESSAGE_ID_SCHEMA_ERROR]), + <<"404">> => error_schema(<<"Message ID not found">>, [?MESSAGE_ID_NOT_FOUND]), + <<"200">> => object_schema(maps:without([payload], properties()), <<"Get delayed message success">>) } }, delete => #{ description => <<"Delete delayed message">>, parameters => parameters(), responses => #{ + <<"400">> => error_schema(<<"Message ID Schema error">>, [?MESSAGE_ID_SCHEMA_ERROR]), + <<"404">> => error_schema(<<"Message ID not found">>, [?MESSAGE_ID_NOT_FOUND]), <<"200">> => schema(<<"Delete delayed message success">>) } } @@ -153,13 +157,21 @@ delayed_message(get, #{bindings := #{msgid := Id}}) -> _ -> {200, Message#{payload => base64:encode(Payload)}} end; + {error, id_schema_error} -> + {400, generate_http_code_map(id_schema_error, Id)}; {error, not_found} -> - Message = iolist_to_binary(io_lib:format("Message ID ~p not found", [Id])), - {404, #{code => ?MESSAGE_ID_NOT_FOUND, message => Message}} + {404, generate_http_code_map(not_found, Id)} end; delayed_message(delete, #{bindings := #{msgid := Id}}) -> - _ = emqx_delayed:delete_delayed_message(Id), - {200}. + case emqx_delayed:get_delayed_message(Id) of + {ok, _Message} -> + _ = emqx_delayed:delete_delayed_message(Id), + {200}; + {error, id_schema_error} -> + {400, generate_http_code_map(id_schema_error, Id)}; + {error, not_found} -> + {404, generate_http_code_map(not_found, Id)} + end. %%-------------------------------------------------------------------- %% internal function @@ -220,6 +232,11 @@ update_config_(Node, Config) when Node =:= node() -> update_config_(Node, Config) -> rpc_call(Node, ?MODULE, ?FUNCTION_NAME, [Node, Config]). +generate_http_code_map(id_schema_error, Id) -> + #{code => ?MESSAGE_ID_SCHEMA_ERROR, message => iolist_to_binary(io_lib:format("Message ID ~p schema error", [Id]))}; +generate_http_code_map(not_found, Id) -> + #{code => ?MESSAGE_ID_NOT_FOUND, message => iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}. + rpc_call(Node, Module, Fun, Args) -> case rpc:call(Node, Module, Fun, Args) of {badrpc, Reason} -> {error, Reason};