emqx/apps/emqx_modules/src/emqx_delayed_api.erl

307 lines
10 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_delayed_api).
-behaviour(minirest_api).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include("emqx_modules.hrl").
-import(hoconsc, [mk/2, ref/1, ref/2]).
-export([
status/2,
delayed_messages/2,
delayed_message/2
]).
-export([
paths/0,
fields/1,
schema/1
]).
%% for rpc
-export([update_config_/1]).
-export([api_spec/0]).
-define(MAX_PAYLOAD_LENGTH, 2048).
-define(PAYLOAD_TOO_LARGE, <<"PAYLOAD_TOO_LARGE">>).
-define(INTERNAL_ERROR, 'INTERNAL_ERROR').
-define(BAD_REQUEST, 'BAD_REQUEST').
-define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND').
-define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR').
-define(INVALID_NODE, 'INVALID_NODE').
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() ->
[
"/mqtt/delayed",
"/mqtt/delayed/messages",
"/mqtt/delayed/messages/:node/:msgid"
].
schema("/mqtt/delayed") ->
#{
'operationId' => status,
get => #{
tags => ?API_TAG_MQTT,
description => ?DESC(view_status_api),
responses => #{
200 => ref(emqx_modules_schema, "delayed")
}
},
put => #{
tags => ?API_TAG_MQTT,
description => ?DESC(update_api),
'requestBody' => ref(emqx_modules_schema, "delayed"),
responses => #{
200 => mk(
ref(emqx_modules_schema, "delayed"),
#{desc => ?DESC(update_success)}
),
400 => emqx_dashboard_swagger:error_codes(
[?BAD_REQUEST],
?DESC(illegality_limit)
)
}
}
};
schema("/mqtt/delayed/messages/:node/:msgid") ->
#{
'operationId' => delayed_message,
get => #{
tags => ?API_TAG_MQTT,
description => ?DESC(get_message_api),
parameters => [
{node,
mk(
binary(),
#{in => path, desc => ?DESC(node)}
)},
{msgid, mk(binary(), #{in => path, desc => ?DESC(msgid)})}
],
responses => #{
200 => ref("message_without_payload"),
400 => emqx_dashboard_swagger:error_codes(
[?MESSAGE_ID_SCHEMA_ERROR, ?INVALID_NODE],
?DESC(bad_msgid_format)
),
404 => emqx_dashboard_swagger:error_codes(
[?MESSAGE_ID_NOT_FOUND],
?DESC(msgid_not_found)
)
}
},
delete => #{
tags => ?API_TAG_MQTT,
description => ?DESC(delete_api),
parameters => [
{node,
mk(
binary(),
#{in => path, desc => ?DESC(node)}
)},
{msgid, mk(binary(), #{in => path, desc => ?DESC(msgid)})}
],
responses => #{
204 => <<"Delete delayed message success">>,
400 => emqx_dashboard_swagger:error_codes(
[?MESSAGE_ID_SCHEMA_ERROR, ?INVALID_NODE],
?DESC(bad_msgid_format)
),
404 => emqx_dashboard_swagger:error_codes(
[?MESSAGE_ID_NOT_FOUND],
?DESC(msgid_not_found)
)
}
}
};
schema("/mqtt/delayed/messages") ->
#{
'operationId' => delayed_messages,
get => #{
tags => ?API_TAG_MQTT,
description => ?DESC(list_api),
parameters => [ref(emqx_dashboard_swagger, page), ref(emqx_dashboard_swagger, limit)],
responses => #{
200 =>
[
{data, mk(hoconsc:array(ref("message")), #{})},
{meta, [
{page, mk(pos_integer(), #{desc => ?DESC(view_page)})},
{limit, mk(pos_integer(), #{desc => ?DESC(view_limit)})},
{count, mk(non_neg_integer(), #{desc => ?DESC(count)})}
]}
]
}
}
}.
fields("message_without_payload") ->
[
{msgid, mk(integer(), #{desc => ?DESC(msgid)})},
{node, mk(binary(), #{desc => ?DESC(node)})},
{publish_at, mk(binary(), #{desc => ?DESC(publish_at)})},
{delayed_interval, mk(pos_integer(), #{desc => ?DESC(delayed_interval)})},
{delayed_remaining, mk(non_neg_integer(), #{desc => ?DESC(delayed_remaining)})},
{expected_at, mk(binary(), #{desc => ?DESC(expected_at)})},
{topic, mk(binary(), #{desc => ?DESC(topic), example => <<"/sys/#">>})},
{qos, mk(emqx_schema:qos(), #{desc => ?DESC(qos)})},
{from_clientid, mk(binary(), #{desc => ?DESC(from_clientid)})},
{from_username, mk(binary(), #{desc => ?DESC(from_username)})}
];
fields("message") ->
fields("message_without_payload") ++
[{payload, mk(binary(), #{desc => ?DESC(payload)})}].
%%--------------------------------------------------------------------
%% HTTP API
%%--------------------------------------------------------------------
status(get, _Params) ->
{200, get_status()};
status(put, #{body := Body}) ->
update_config(Body).
delayed_messages(get, #{query_string := Qs}) ->
{200, emqx_delayed:cluster_list(Qs)}.
delayed_message(get, #{bindings := #{node := NodeBin, msgid := HexId}}) ->
MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_existing_atom/1),
MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1),
with_maybe(
[MaybeNode, MaybeId],
fun(Node, Id) ->
case emqx_delayed:get_delayed_message(Node, Id) of
{ok, Message} ->
Payload = maps:get(payload, Message),
case erlang:byte_size(Payload) > ?MAX_PAYLOAD_LENGTH of
true ->
{200, Message#{payload => ?PAYLOAD_TOO_LARGE}};
_ ->
{200, Message#{payload => base64:encode(Payload)}}
end;
{error, not_found} ->
{404, generate_http_code_map(not_found, HexId)};
{badrpc, _} ->
{400, generate_http_code_map(invalid_node, NodeBin)}
end
end
);
delayed_message(delete, #{bindings := #{node := NodeBin, msgid := HexId}}) ->
MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_atom/1),
MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1),
with_maybe(
[MaybeNode, MaybeId],
fun(Node, Id) ->
case emqx_delayed:delete_delayed_message(Node, Id) of
ok ->
{204};
{error, not_found} ->
{404, generate_http_code_map(not_found, Id)}
end
end
).
%%--------------------------------------------------------------------
%% internal function
%%--------------------------------------------------------------------
get_status() ->
emqx_conf:get([delayed], #{}).
update_config(Config) ->
case generate_config(Config) of
{ok, Config} ->
update_config_(Config);
{error, {Code, Message}} ->
{400, #{code => Code, message => Message}}
end.
generate_config(Config) ->
generate_config(Config, [fun generate_max_delayed_messages/1]).
generate_config(Config, []) ->
{ok, Config};
generate_config(Config, [Fun | Tail]) ->
case Fun(Config) of
{ok, Config} ->
generate_config(Config, Tail);
{error, CodeMessage} ->
{error, CodeMessage}
end.
generate_max_delayed_messages(Config = #{<<"max_delayed_messages">> := Max}) when Max >= 0 ->
{ok, Config};
generate_max_delayed_messages(#{<<"max_delayed_messages">> := Max}) when Max < 0 ->
{error, {?BAD_REQUEST, <<"Max delayed must be equal or greater than 0">>}};
generate_max_delayed_messages(Config) ->
{ok, Config}.
update_config_(Config) ->
case emqx_delayed:update_config(Config) of
{ok, #{raw_config := NewDelayed}} ->
{200, NewDelayed};
{error, Reason} ->
Message = list_to_binary(
io_lib:format("Update config failed ~p", [Reason])
),
{500, ?INTERNAL_ERROR, Message}
end.
generate_http_code_map(id_schema_error, Id) ->
#{
code => ?MESSAGE_ID_SCHEMA_ERROR,
message =>
iolist_to_binary(io_lib:format("Message ID ~s schema error", [Id]))
};
generate_http_code_map(not_found, Id) ->
#{
code => ?MESSAGE_ID_NOT_FOUND,
message =>
iolist_to_binary(io_lib:format("Message ID ~s not found", [Id]))
};
generate_http_code_map(invalid_node, Node) ->
#{
code => ?INVALID_NODE,
message =>
iolist_to_binary(io_lib:format("The node name ~s is invalid", [Node]))
}.
make_maybe(X, Error, Fun) ->
try Fun(X) of
Right ->
Right
catch
_:_ ->
{left, X, Error}
end.
with_maybe(Maybes, Cont) ->
with_maybe(Maybes, Cont, []).
with_maybe([], Cont, Rights) ->
erlang:apply(Cont, lists:reverse(Rights));
with_maybe([{left, X, Error} | _], _Cont, _Rights) ->
{400, generate_http_code_map(Error, X)};
with_maybe([Right | T], Cont, Rights) ->
with_maybe(T, Cont, [Right | Rights]).