307 lines
10 KiB
Erlang
307 lines
10 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_delayed_api).
|
|
|
|
-behaviour(minirest_api).
|
|
|
|
-include_lib("typerefl/include/types.hrl").
|
|
-include_lib("hocon/include/hoconsc.hrl").
|
|
-include("emqx_modules.hrl").
|
|
|
|
-import(hoconsc, [mk/2, ref/1, ref/2]).
|
|
|
|
-export([
|
|
status/2,
|
|
delayed_messages/2,
|
|
delayed_message/2
|
|
]).
|
|
|
|
-export([
|
|
paths/0,
|
|
fields/1,
|
|
schema/1
|
|
]).
|
|
|
|
%% for rpc
|
|
-export([update_config_/1]).
|
|
|
|
-export([api_spec/0]).
|
|
|
|
-define(MAX_PAYLOAD_LENGTH, 2048).
|
|
-define(PAYLOAD_TOO_LARGE, <<"PAYLOAD_TOO_LARGE">>).
|
|
|
|
-define(INTERNAL_ERROR, 'INTERNAL_ERROR').
|
|
-define(BAD_REQUEST, 'BAD_REQUEST').
|
|
|
|
-define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND').
|
|
-define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR').
|
|
-define(INVALID_NODE, 'INVALID_NODE').
|
|
|
|
api_spec() ->
|
|
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
|
|
|
|
paths() ->
|
|
[
|
|
"/mqtt/delayed",
|
|
"/mqtt/delayed/messages",
|
|
"/mqtt/delayed/messages/:node/:msgid"
|
|
].
|
|
|
|
schema("/mqtt/delayed") ->
|
|
#{
|
|
'operationId' => status,
|
|
get => #{
|
|
tags => ?API_TAG_MQTT,
|
|
description => ?DESC(view_status_api),
|
|
responses => #{
|
|
200 => ref(emqx_modules_schema, "delayed")
|
|
}
|
|
},
|
|
put => #{
|
|
tags => ?API_TAG_MQTT,
|
|
description => ?DESC(update_api),
|
|
'requestBody' => ref(emqx_modules_schema, "delayed"),
|
|
responses => #{
|
|
200 => mk(
|
|
ref(emqx_modules_schema, "delayed"),
|
|
#{desc => ?DESC(update_success)}
|
|
),
|
|
400 => emqx_dashboard_swagger:error_codes(
|
|
[?BAD_REQUEST],
|
|
?DESC(illegality_limit)
|
|
)
|
|
}
|
|
}
|
|
};
|
|
schema("/mqtt/delayed/messages/:node/:msgid") ->
|
|
#{
|
|
'operationId' => delayed_message,
|
|
get => #{
|
|
tags => ?API_TAG_MQTT,
|
|
description => ?DESC(get_message_api),
|
|
parameters => [
|
|
{node,
|
|
mk(
|
|
binary(),
|
|
#{in => path, desc => ?DESC(node)}
|
|
)},
|
|
{msgid, mk(binary(), #{in => path, desc => ?DESC(msgid)})}
|
|
],
|
|
responses => #{
|
|
200 => ref("message_without_payload"),
|
|
400 => emqx_dashboard_swagger:error_codes(
|
|
[?MESSAGE_ID_SCHEMA_ERROR, ?INVALID_NODE],
|
|
?DESC(bad_msgid_format)
|
|
),
|
|
404 => emqx_dashboard_swagger:error_codes(
|
|
[?MESSAGE_ID_NOT_FOUND],
|
|
?DESC(msgid_not_found)
|
|
)
|
|
}
|
|
},
|
|
delete => #{
|
|
tags => ?API_TAG_MQTT,
|
|
description => ?DESC(delete_api),
|
|
parameters => [
|
|
{node,
|
|
mk(
|
|
binary(),
|
|
#{in => path, desc => ?DESC(node)}
|
|
)},
|
|
{msgid, mk(binary(), #{in => path, desc => ?DESC(msgid)})}
|
|
],
|
|
responses => #{
|
|
204 => <<"Delete delayed message success">>,
|
|
400 => emqx_dashboard_swagger:error_codes(
|
|
[?MESSAGE_ID_SCHEMA_ERROR, ?INVALID_NODE],
|
|
?DESC(bad_msgid_format)
|
|
),
|
|
404 => emqx_dashboard_swagger:error_codes(
|
|
[?MESSAGE_ID_NOT_FOUND],
|
|
?DESC(msgid_not_found)
|
|
)
|
|
}
|
|
}
|
|
};
|
|
schema("/mqtt/delayed/messages") ->
|
|
#{
|
|
'operationId' => delayed_messages,
|
|
get => #{
|
|
tags => ?API_TAG_MQTT,
|
|
description => ?DESC(list_api),
|
|
parameters => [ref(emqx_dashboard_swagger, page), ref(emqx_dashboard_swagger, limit)],
|
|
responses => #{
|
|
200 =>
|
|
[
|
|
{data, mk(hoconsc:array(ref("message")), #{})},
|
|
{meta, [
|
|
{page, mk(pos_integer(), #{desc => ?DESC(view_page)})},
|
|
{limit, mk(pos_integer(), #{desc => ?DESC(view_limit)})},
|
|
{count, mk(non_neg_integer(), #{desc => ?DESC(count)})}
|
|
]}
|
|
]
|
|
}
|
|
}
|
|
}.
|
|
|
|
fields("message_without_payload") ->
|
|
[
|
|
{msgid, mk(integer(), #{desc => ?DESC(msgid)})},
|
|
{node, mk(binary(), #{desc => ?DESC(node)})},
|
|
{publish_at, mk(binary(), #{desc => ?DESC(publish_at)})},
|
|
{delayed_interval, mk(pos_integer(), #{desc => ?DESC(delayed_interval)})},
|
|
{delayed_remaining, mk(non_neg_integer(), #{desc => ?DESC(delayed_remaining)})},
|
|
{expected_at, mk(binary(), #{desc => ?DESC(expected_at)})},
|
|
{topic, mk(binary(), #{desc => ?DESC(topic), example => <<"/sys/#">>})},
|
|
{qos, mk(emqx_schema:qos(), #{desc => ?DESC(qos)})},
|
|
{from_clientid, mk(binary(), #{desc => ?DESC(from_clientid)})},
|
|
{from_username, mk(binary(), #{desc => ?DESC(from_username)})}
|
|
];
|
|
fields("message") ->
|
|
fields("message_without_payload") ++
|
|
[{payload, mk(binary(), #{desc => ?DESC(payload)})}].
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% HTTP API
|
|
%%--------------------------------------------------------------------
|
|
status(get, _Params) ->
|
|
{200, get_status()};
|
|
status(put, #{body := Body}) ->
|
|
update_config(Body).
|
|
|
|
delayed_messages(get, #{query_string := Qs}) ->
|
|
{200, emqx_delayed:cluster_list(Qs)}.
|
|
|
|
delayed_message(get, #{bindings := #{node := NodeBin, msgid := HexId}}) ->
|
|
MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_existing_atom/1),
|
|
MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1),
|
|
with_maybe(
|
|
[MaybeNode, MaybeId],
|
|
fun(Node, Id) ->
|
|
case emqx_delayed:get_delayed_message(Node, Id) of
|
|
{ok, Message} ->
|
|
Payload = maps:get(payload, Message),
|
|
case erlang:byte_size(Payload) > ?MAX_PAYLOAD_LENGTH of
|
|
true ->
|
|
{200, Message#{payload => ?PAYLOAD_TOO_LARGE}};
|
|
_ ->
|
|
{200, Message#{payload => base64:encode(Payload)}}
|
|
end;
|
|
{error, not_found} ->
|
|
{404, generate_http_code_map(not_found, HexId)};
|
|
{badrpc, _} ->
|
|
{400, generate_http_code_map(invalid_node, NodeBin)}
|
|
end
|
|
end
|
|
);
|
|
delayed_message(delete, #{bindings := #{node := NodeBin, msgid := HexId}}) ->
|
|
MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_atom/1),
|
|
MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1),
|
|
with_maybe(
|
|
[MaybeNode, MaybeId],
|
|
fun(Node, Id) ->
|
|
case emqx_delayed:delete_delayed_message(Node, Id) of
|
|
ok ->
|
|
{204};
|
|
{error, not_found} ->
|
|
{404, generate_http_code_map(not_found, Id)}
|
|
end
|
|
end
|
|
).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% internal function
|
|
%%--------------------------------------------------------------------
|
|
get_status() ->
|
|
emqx_conf:get([delayed], #{}).
|
|
|
|
update_config(Config) ->
|
|
case generate_config(Config) of
|
|
{ok, Config} ->
|
|
update_config_(Config);
|
|
{error, {Code, Message}} ->
|
|
{400, #{code => Code, message => Message}}
|
|
end.
|
|
generate_config(Config) ->
|
|
generate_config(Config, [fun generate_max_delayed_messages/1]).
|
|
|
|
generate_config(Config, []) ->
|
|
{ok, Config};
|
|
generate_config(Config, [Fun | Tail]) ->
|
|
case Fun(Config) of
|
|
{ok, Config} ->
|
|
generate_config(Config, Tail);
|
|
{error, CodeMessage} ->
|
|
{error, CodeMessage}
|
|
end.
|
|
|
|
generate_max_delayed_messages(Config = #{<<"max_delayed_messages">> := Max}) when Max >= 0 ->
|
|
{ok, Config};
|
|
generate_max_delayed_messages(#{<<"max_delayed_messages">> := Max}) when Max < 0 ->
|
|
{error, {?BAD_REQUEST, <<"Max delayed must be equal or greater than 0">>}};
|
|
generate_max_delayed_messages(Config) ->
|
|
{ok, Config}.
|
|
|
|
update_config_(Config) ->
|
|
case emqx_delayed:update_config(Config) of
|
|
{ok, #{raw_config := NewDelayed}} ->
|
|
{200, NewDelayed};
|
|
{error, Reason} ->
|
|
Message = list_to_binary(
|
|
io_lib:format("Update config failed ~p", [Reason])
|
|
),
|
|
{500, ?INTERNAL_ERROR, Message}
|
|
end.
|
|
|
|
generate_http_code_map(id_schema_error, Id) ->
|
|
#{
|
|
code => ?MESSAGE_ID_SCHEMA_ERROR,
|
|
message =>
|
|
iolist_to_binary(io_lib:format("Message ID ~s schema error", [Id]))
|
|
};
|
|
generate_http_code_map(not_found, Id) ->
|
|
#{
|
|
code => ?MESSAGE_ID_NOT_FOUND,
|
|
message =>
|
|
iolist_to_binary(io_lib:format("Message ID ~s not found", [Id]))
|
|
};
|
|
generate_http_code_map(invalid_node, Node) ->
|
|
#{
|
|
code => ?INVALID_NODE,
|
|
message =>
|
|
iolist_to_binary(io_lib:format("The node name ~s is invalid", [Node]))
|
|
}.
|
|
|
|
make_maybe(X, Error, Fun) ->
|
|
try Fun(X) of
|
|
Right ->
|
|
Right
|
|
catch
|
|
_:_ ->
|
|
{left, X, Error}
|
|
end.
|
|
|
|
with_maybe(Maybes, Cont) ->
|
|
with_maybe(Maybes, Cont, []).
|
|
|
|
with_maybe([], Cont, Rights) ->
|
|
erlang:apply(Cont, lists:reverse(Rights));
|
|
with_maybe([{left, X, Error} | _], _Cont, _Rights) ->
|
|
{400, generate_http_code_map(Error, X)};
|
|
with_maybe([Right | T], Cont, Rights) ->
|
|
with_maybe(T, Cont, [Right | Rights]).
|