feat(delayed_api): support hocon schema

This commit is contained in:
zhongwencool 2021-09-22 18:49:39 +08:00
parent 357456880e
commit 8c441673c2
8 changed files with 249 additions and 148 deletions

View File

@ -159,11 +159,11 @@ fields("stats") ->
fields("authorization") ->
[ {"no_match",
sc(hoconsc:union([allow, deny]),
sc(hoconsc:enum([allow, deny]),
#{ default => allow
})}
, {"deny_action",
sc(hoconsc:union([ignore, disconnect]),
sc(hoconsc:enum([ignore, disconnect]),
#{ default => ignore
})}
, {"cache",
@ -297,7 +297,7 @@ fields("mqtt") ->
})
}
, {"mqueue_default_priority",
sc(union(highest, lowest),
sc(hoconsc:enum([highest, lowest]),
#{ default => lowest
})
}
@ -312,11 +312,11 @@ fields("mqtt") ->
})
}
, {"peer_cert_as_username",
sc(hoconsc:union([disabled, cn, dn, crt, pem, md5]),
sc(hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
#{ default => disabled
})}
, {"peer_cert_as_clientid",
sc(hoconsc:union([disabled, cn, dn, crt, pem, md5]),
sc(hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
#{ default => disabled
})}
];
@ -525,7 +525,7 @@ fields("ws_opts") ->
})
}
, {"mqtt_piggyback",
sc(hoconsc:union([single, multiple]),
sc(hoconsc:enum([single, multiple]),
#{ default => multiple
})
}
@ -653,7 +653,7 @@ fields(ssl_client_opts) ->
fields("deflate_opts") ->
[ {"level",
sc(hoconsc:union([none, default, best_compression, best_speed]),
sc(hoconsc:enum([none, default, best_compression, best_speed]),
#{})
}
, {"mem_level",
@ -662,15 +662,15 @@ fields("deflate_opts") ->
})
}
, {"strategy",
sc(hoconsc:union([default, filtered, huffman_only, rle]),
sc(hoconsc:enum([default, filtered, huffman_only, rle]),
#{})
}
, {"server_context_takeover",
sc(hoconsc:union([takeover, no_takeover]),
sc(hoconsc:enum([takeover, no_takeover]),
#{})
}
, {"client_context_takeover",
sc(hoconsc:union([takeover, no_takeover]),
sc(hoconsc:enum([takeover, no_takeover]),
#{})
}
, {"server_max_window_bits",
@ -709,12 +709,12 @@ fields("broker") ->
})
}
, {"session_locking_strategy",
sc(hoconsc:union([local, leader, quorum, all]),
sc(hoconsc:enum([local, leader, quorum, all]),
#{ default => quorum
})
}
, {"shared_subscription_strategy",
sc(hoconsc:union([random, round_robin]),
sc(hoconsc:enum([random, round_robin]),
#{ default => round_robin
})
}
@ -736,7 +736,7 @@ fields("broker") ->
fields("broker_perf") ->
[ {"route_lock_type",
sc(hoconsc:union([key, tab, global]),
sc(hoconsc:enum([key, tab, global]),
#{ default => key
})}
, {"trie_compaction",
@ -962,7 +962,7 @@ the file if it is to be added.
})
}
, {"verify",
sc(hoconsc:union([verify_peer, verify_none]),
sc(hoconsc:enum([verify_peer, verify_none]),
#{ default => Df("verify", verify_none)
})
}

View File

@ -22,7 +22,8 @@
-define(INIT_SCHEMA, #{fields => #{}, translations => #{}, validations => [], namespace => undefined}).
-define(TO_REF(_N_, _F_), iolist_to_binary([to_bin(_N_), ".", to_bin(_F_)])).
-define(TO_COMPONENTS(_M_, _F_), iolist_to_binary([<<"#/components/schemas/">>, ?TO_REF(namespace(_M_), _F_)])).
-define(TO_COMPONENTS_SCHEMA(_M_, _F_), iolist_to_binary([<<"#/components/schemas/">>, ?TO_REF(namespace(_M_), _F_)])).
-define(TO_COMPONENTS_PARAM(_M_, _F_), iolist_to_binary([<<"#/components/parameters/">>, ?TO_REF(namespace(_M_), _F_)])).
%% @equiv spec(Module, #{check_schema => false})
-spec(spec(module()) ->
@ -54,7 +55,6 @@ spec(Module, Options) ->
end, {[], []}, Paths),
{ApiSpec, components(lists:usort(AllRefs))}.
-spec(translate_req(#{binding => list(), query_string => list(), body => map()},
#{module => module(), path => string(), method => atom()}) ->
{ok, #{binding => list(), query_string => list(), body => map()}}|
@ -64,7 +64,7 @@ translate_req(Request, #{module := Module, path := Path, method := Method}) ->
try
Params = maps:get(parameters, Spec, []),
Body = maps:get(requestBody, Spec, []),
{Bindings, QueryStr} = check_parameters(Request, Params),
{Bindings, QueryStr} = check_parameters(Request, Params, Module),
NewBody = check_requestBody(Request, Body, Module, hoconsc:is_schema(Body)),
{ok, Request#{bindings => Bindings, query_string => QueryStr, body => NewBody}}
catch throw:Error ->
@ -93,23 +93,28 @@ parse_spec_ref(Module, Path) ->
maps:without([operationId], Schema)),
{maps:get(operationId, Schema), Specs, Refs}.
check_parameters(Request, Spec) ->
check_parameters(Request, Spec, Module) ->
#{bindings := Bindings, query_string := QueryStr} = Request,
BindingsBin = maps:fold(fun(Key, Value, Acc) -> Acc#{atom_to_binary(Key) => Value} end, #{}, Bindings),
check_parameter(Spec, BindingsBin, QueryStr, #{}, #{}).
check_parameter(Spec, BindingsBin, QueryStr, Module, #{}, #{}).
check_parameter([], _Bindings, _QueryStr, NewBindings, NewQueryStr) -> {NewBindings, NewQueryStr};
check_parameter([{Name, Type} | Spec], Bindings, QueryStr, BindingsAcc, QueryStrAcc) ->
check_parameter([?REF(Fields) | Spec], Bindings, QueryStr, LocalMod, BindingsAcc, QueryStrAcc) ->
check_parameter([?R_REF(LocalMod, Fields) | Spec], Bindings, QueryStr, LocalMod, BindingsAcc, QueryStrAcc);
check_parameter([?R_REF(Module, Fields) | Spec], Bindings, QueryStr, LocalMod, BindingsAcc, QueryStrAcc) ->
Params = apply(Module, fields, [Fields]),
check_parameter(Params ++ Spec, Bindings, QueryStr, LocalMod, BindingsAcc, QueryStrAcc);
check_parameter([], _Bindings, _QueryStr, _Module, NewBindings, NewQueryStr) -> {NewBindings, NewQueryStr};
check_parameter([{Name, Type} | Spec], Bindings, QueryStr, Module, BindingsAcc, QueryStrAcc) ->
Schema = ?INIT_SCHEMA#{roots => [{Name, Type}]},
case hocon_schema:field_schema(Type, in) of
path ->
NewBindings = hocon_schema:check_plain(Schema, Bindings, #{atom_key => true, override_env => false}),
NewBindingsAcc = maps:merge(BindingsAcc, NewBindings),
check_parameter(Spec, Bindings, QueryStr, NewBindingsAcc, QueryStrAcc);
check_parameter(Spec, Bindings, QueryStr, Module, NewBindingsAcc, QueryStrAcc);
query ->
NewQueryStr = hocon_schema:check_plain(Schema, QueryStr, #{override_env => false}),
NewQueryStrAcc = maps:merge(QueryStrAcc, NewQueryStr),
check_parameter(Spec, Bindings, QueryStr, BindingsAcc, NewQueryStrAcc)
check_parameter(Spec, Bindings, QueryStr, Module, BindingsAcc, NewQueryStrAcc)
end.
check_requestBody(#{body := Body}, Schema, Module, true) ->
@ -154,19 +159,28 @@ to_spec(Meta, Params, RequestBody, Responses) ->
parameters(Params, Module) ->
{SpecList, AllRefs} =
lists:foldl(fun({Name, Type}, {Acc, RefsAcc}) ->
In = hocon_schema:field_schema(Type, in),
In =:= undefined andalso throw({error, <<"missing in:path/query field in parameters">>}),
Nullable = hocon_schema:field_schema(Type, nullable),
Default = hocon_schema:field_schema(Type, default),
HoconType = hocon_schema:field_schema(Type, type),
Meta = init_meta(Nullable, Default),
{ParamType, Refs} = hocon_schema_to_spec(HoconType, Module),
Spec0 = init_prop([required | ?DEFAULT_FIELDS],
#{schema => maps:merge(ParamType, Meta), name => Name, in => In}, Type),
Spec1 = trans_required(Spec0, Nullable, In),
Spec2 = trans_desc(Spec1, Type),
{[Spec2 | Acc], Refs ++ RefsAcc}
lists:foldl(fun(Param, {Acc, RefsAcc}) ->
case Param of
?REF(StructName) ->
{[#{<<"$ref">> => ?TO_COMPONENTS_PARAM(Module, StructName)} |Acc],
[{Module, StructName, parameter}|RefsAcc]};
?R_REF(RModule, StructName) ->
{[#{<<"$ref">> => ?TO_COMPONENTS_PARAM(RModule, StructName)} |Acc],
[{RModule, StructName, parameter}|RefsAcc]};
{Name, Type} ->
In = hocon_schema:field_schema(Type, in),
In =:= undefined andalso throw({error, <<"missing in:path/query field in parameters">>}),
Nullable = hocon_schema:field_schema(Type, nullable),
Default = hocon_schema:field_schema(Type, default),
HoconType = hocon_schema:field_schema(Type, type),
Meta = init_meta(Nullable, Default),
{ParamType, Refs} = hocon_schema_to_spec(HoconType, Module),
Spec0 = init_prop([required | ?DEFAULT_FIELDS],
#{schema => maps:merge(ParamType, Meta), name => Name, in => In}, Type),
Spec1 = trans_required(Spec0, Nullable, In),
Spec2 = trans_desc(Spec1, Type),
{[Spec2 | Acc], Refs ++ RefsAcc}
end
end, {[], []}, Params),
{lists:reverse(SpecList), AllRefs}.
@ -196,7 +210,7 @@ trans_required(Spec, _, _) -> Spec.
trans_desc(Spec, Hocon) ->
case hocon_schema:field_schema(Hocon, desc) of
undefined -> Spec;
Desc -> Spec#{description => Desc}
Desc -> Spec#{description => to_bin(Desc)}
end.
requestBody([], _Module) -> {[], []};
@ -248,6 +262,13 @@ components([{Module, Field} | Refs], SpecAcc, SubRefsAcc) ->
Namespace = namespace(Module),
{Object, SubRefs} = parse_object(Props, Module),
NewSpecAcc = SpecAcc#{?TO_REF(Namespace, Field) => Object},
components(Refs, NewSpecAcc, SubRefs ++ SubRefsAcc);
%% parameters in ref only have one value, not array
components([{Module, Field, parameter} | Refs], SpecAcc, SubRefsAcc) ->
Props = apply(Module, fields, [Field]),
{[Param], SubRefs} = parameters(Props, Module),
Namespace = namespace(Module),
NewSpecAcc = SpecAcc#{?TO_REF(Namespace, Field) => Param},
components(Refs, NewSpecAcc, SubRefs ++ SubRefsAcc).
namespace(Module) ->
@ -257,10 +278,10 @@ namespace(Module) ->
end.
hocon_schema_to_spec(?R_REF(Module, StructName), _LocalModule) ->
{#{<<"$ref">> => ?TO_COMPONENTS(Module, StructName)},
{#{<<"$ref">> => ?TO_COMPONENTS_SCHEMA(Module, StructName)},
[{Module, StructName}]};
hocon_schema_to_spec(?REF(StructName), LocalModule) ->
{#{<<"$ref">> => ?TO_COMPONENTS(LocalModule, StructName)},
{#{<<"$ref">> => ?TO_COMPONENTS_SCHEMA(LocalModule, StructName)},
[{LocalModule, StructName}]};
hocon_schema_to_spec(Type, _LocalModule) when ?IS_TYPEREFL(Type) ->
{typename_to_spec(typerefl:name(Type)), []};

View File

@ -0,0 +1,46 @@
-module(emqx_swagger).
-include_lib("typerefl/include/types.hrl").
-import(hoconsc, [mk/2]).
-define(MAX_ROW_LIMIT, 100).
%% API
-export([fields/1]).
-export([error_codes/1, error_codes/2]).
fields(page) ->
[{page,
mk(integer(),
#{
in => query,
desc => <<"Page number of the results to fetch.">>,
default => 1,
example => 1})
}];
fields(limit) ->
[{limit,
mk(range(1, ?MAX_ROW_LIMIT),
#{
in => query,
desc => iolist_to_binary([<<"Results per page(max ">>,
integer_to_binary(?MAX_ROW_LIMIT), <<")">>]),
default => ?MAX_ROW_LIMIT,
example => 50
})
}].
error_codes(Codes) ->
error_codes(Codes, <<"Error code to troubleshoot problems.">>).
error_codes(Codes = [_ | _], MsgExample) ->
[code(Codes), message(MsgExample)].
message(Example) ->
{message, mk(string(), #{
desc => <<"Detailed description of the error.">>,
example => Example
})}.
code(Codes) ->
{code, mk(hoconsc:enum(Codes), #{})}.

View File

@ -1,13 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author zhongwen
%%% @copyright (C) 2021, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 22. 9 2021 13:38
%%%-------------------------------------------------------------------
-module(emqx_swagger_util).
-author("zhongwen").
%% API
-export([]).

View File

@ -3,10 +3,10 @@
-behaviour(hocon_schema).
%% API
-export([paths/0, api_spec/0, schema/1]).
-export([t_in_path/1, t_in_query/1, t_in_mix/1, t_without_in/1]).
-export([paths/0, api_spec/0, schema/1, fields/1]).
-export([t_in_path/1, t_in_query/1, t_in_mix/1, t_without_in/1, t_ref/1]).
-export([t_require/1, t_nullable/1, t_method/1, t_api_spec/1]).
-export([t_in_path_trans/1, t_in_query_trans/1, t_in_mix_trans/1]).
-export([t_in_path_trans/1, t_in_query_trans/1, t_in_mix_trans/1, t_ref_trans/1]).
-export([t_in_path_trans_error/1, t_in_query_trans_error/1, t_in_mix_trans_error/1]).
-export([all/0, suite/0, groups/0]).
@ -20,9 +20,9 @@
all() -> [{group, spec}, {group, validation}].
suite() -> [{timetrap, {minutes, 1}}].
groups() -> [
{spec, [parallel], [t_api_spec, t_in_path, t_in_query, t_in_mix,
{spec, [parallel], [t_api_spec, t_in_path, t_ref, t_in_query, t_in_mix,
t_without_in, t_require, t_nullable, t_method]},
{validation, [parallel], [t_in_path_trans, t_in_query_trans, t_in_mix_trans,
{validation, [parallel], [t_in_path_trans, t_ref_trans, t_in_query_trans, t_in_mix_trans,
t_in_path_trans_error, t_in_query_trans_error, t_in_mix_trans_error]}
].
@ -44,6 +44,18 @@ t_in_query(_Config) ->
validate("/test/in/query", Expect),
ok.
t_ref(_Config) ->
LocalPath = "/test/in/ref/local",
Path = "/test/in/ref",
Expect = [#{<<"$ref">> => <<"#/components/parameters/emqx_swagger_parameter_SUITE.page">>}],
{OperationId, Spec, Refs} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path),
{OperationId, Spec, Refs} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, LocalPath),
?assertEqual(test, OperationId),
Params = maps:get(parameters, maps:get(post, Spec)),
?assertEqual(Expect, Params),
?assertEqual([{?MODULE, page, parameter}], Refs),
ok.
t_in_mix(_Config) ->
Expect =
[#{description => <<"Indicates which sorts of issues to return">>,
@ -115,6 +127,18 @@ t_in_query_trans(_Config) ->
?assertEqual(Expect, trans_parameters(Path, #{}, #{<<"per_page">> => 100})),
ok.
t_ref_trans(_Config) ->
LocalPath = "/test/in/ref/local",
Path = "/test/in/ref",
Expect = {ok, #{bindings => #{},body => #{},
query_string => #{<<"per_page">> => 100}}},
?assertEqual(Expect, trans_parameters(Path, #{}, #{<<"per_page">> => 100})),
?assertEqual(Expect, trans_parameters(LocalPath, #{}, #{<<"per_page">> => 100})),
{400,'BAD_REQUEST', Reason} = trans_parameters(Path, #{}, #{<<"per_page">> => 1010}),
?assertNotEqual(nomatch, binary:match(Reason, [<<"per_page">>])),
{400,'BAD_REQUEST', Reason} = trans_parameters(LocalPath, #{}, #{<<"per_page">> => 1010}),
ok.
t_in_mix_trans(_Config) ->
Path = "/test/in/mix/:state",
Bindings = #{
@ -186,7 +210,7 @@ trans_parameters(Path, Bindings, QueryStr) ->
api_spec() -> emqx_dashboard_swagger:spec(?MODULE).
paths() -> ["/test/in/:filter", "/test/in/query", "/test/in/mix/:state",
paths() -> ["/test/in/:filter", "/test/in/query", "/test/in/mix/:state", "/test/in/ref",
"/required/false", "/nullable/false", "/nullable/true", "/method/ok"].
schema("/test/in/:filter") ->
@ -213,6 +237,22 @@ schema("/test/in/query") ->
responses => #{200 => <<"ok">>}
}
};
schema("/test/in/ref/local") ->
#{
operationId => test,
post => #{
parameters => [hoconsc:ref(page)],
responses => #{200 => <<"ok">>}
}
};
schema("/test/in/ref") ->
#{
operationId => test,
post => #{
parameters => [hoconsc:ref(?MODULE, page)],
responses => #{200 => <<"ok">>}
}
};
schema("/test/in/mix/:state") ->
#{
operationId => test,
@ -257,6 +297,13 @@ schema("/method/ok") ->
#{operationId => test}, ?METHODS);
schema("/method/error") ->
#{operationId => test, bar => #{200 => <<"ok">>}}.
fields(page) ->
[
{per_page,
mk(range(1, 100),
#{in => query, desc => <<"results per page (max 100)">>, example => 1})}
].
to_schema(Params) ->
#{
operationId => test,

View File

@ -102,7 +102,7 @@ fields("cluster") ->
, default => emqxcl
})}
, {"discovery_strategy",
sc(union([manual, static, mcast, dns, etcd, k8s]),
sc(hoconsc:enum([manual, static, mcast, dns, etcd, k8s]),
#{ default => manual
})}
, {"autoclean",
@ -122,7 +122,7 @@ fields("cluster") ->
sc(ref(cluster_mcast),
#{})}
, {"proto_dist",
sc(union([inet_tcp, inet6_tcp, inet_tls]),
sc(hoconsc:enum([inet_tcp, inet6_tcp, inet_tls]),
#{ mapping => "ekka.proto_dist"
, default => inet_tcp
})}
@ -136,7 +136,7 @@ fields("cluster") ->
sc(ref(cluster_k8s),
#{})}
, {"db_backend",
sc(union([mnesia, rlog]),
sc(hoconsc:enum([mnesia, rlog]),
#{ mapping => "ekka.db_backend"
, default => mnesia
})}
@ -224,7 +224,7 @@ fields(cluster_k8s) ->
#{ default => "emqx"
})}
, {"address_type",
sc(union([ip, dns, hostname]),
sc(hoconsc:enum([ip, dns, hostname]),
#{})}
, {"app_name",
sc(string(),
@ -242,7 +242,7 @@ fields(cluster_k8s) ->
fields("rlog") ->
[ {"role",
sc(union([core, replicant]),
sc(hoconsc:enum([core, replicant]),
#{ mapping => "ekka.node_role"
, default => core
})}
@ -334,7 +334,7 @@ fields("cluster_call") ->
fields("rpc") ->
[ {"mode",
sc(union(sync, async),
sc(hoconsc:enum([sync, async]),
#{ default => async
})}
, {"async_batch_size",
@ -343,7 +343,7 @@ fields("rpc") ->
, default => 256
})}
, {"port_discovery",
sc(union(manual, stateless),
sc(hoconsc:enum([manual, stateless]),
#{ mapping => "gen_rpc.port_discovery"
, default => stateless
})}
@ -434,7 +434,7 @@ fields("log_file_handler") ->
sc(ref("log_rotation"),
#{})}
, {"max_size",
sc(union([infinity, emqx_schema:bytesize()]),
sc(hoconsc:union([infinity, emqx_schema:bytesize()]),
#{ default => "10MB"
})}
] ++ log_handler_common_confs();
@ -464,7 +464,7 @@ fields("log_overload_kill") ->
#{ default => 20000
})}
, {"restart_after",
sc(union(emqx_schema:duration(), infinity),
sc(hoconsc:union([emqx_schema:duration(), infinity]),
#{ default => "5s"
})}
];
@ -582,7 +582,7 @@ log_handler_common_confs() ->
#{ default => unlimited
})}
, {"formatter",
sc(union([text, json]),
sc(hoconsc:enum([text, json]),
#{ default => text
})}
, {"single_line",
@ -608,11 +608,11 @@ log_handler_common_confs() ->
sc(ref("log_burst_limit"),
#{})}
, {"supervisor_reports",
sc(union([error, progress]),
sc(hoconsc:enum([error, progress]),
#{ default => error
})}
, {"max_depth",
sc(union([unlimited, integer()]),
sc(hoconsc:union([unlimited, integer()]),
#{ default => 100
})}
].

View File

@ -18,22 +18,19 @@
-behavior(minirest_api).
-import(emqx_mgmt_util, [ page_params/0
, schema/1
, schema/2
, object_schema/2
, error_schema/2
, page_object_schema/1
, properties/1
]).
-include_lib("typerefl/include/types.hrl").
-import(hoconsc, [mk/2, ref/1, ref/2]).
-define(MAX_PAYLOAD_LENGTH, 2048).
-define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE').
-export([ status/2
, delayed_messages/2
, delayed_message/2
]).
-export([status/2
, delayed_messages/2
, delayed_message/2
]).
-export([paths/0, fields/1, schema/1]).
%% for rpc
-export([update_config_/1]).
@ -49,91 +46,94 @@
-define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR').
api_spec() ->
{
[status_api(), delayed_messages_api(), delayed_message_api()],
[]
}.
emqx_dashboard_swagger:spec(?MODULE).
conf_schema() ->
emqx_mgmt_api_configs:gen_schema(emqx:get_raw_config([delayed])).
properties() ->
PayloadDesc = io_lib:format("Payload, base64 encode. Payload will be ~p if length large than ~p",
[?PAYLOAD_TOO_LARGE, ?MAX_PAYLOAD_LENGTH]),
properties([
{msgid, integer, <<"Message Id">>},
{publish_at, string, <<"Client publish message time, rfc 3339">>},
{delayed_interval, integer, <<"Delayed interval, second">>},
{delayed_remaining, integer, <<"Delayed remaining, second">>},
{expected_at, string, <<"Expect publish time, rfc 3339">>},
{topic, string, <<"Topic">>},
{qos, string, <<"QoS">>},
{payload, string, iolist_to_binary(PayloadDesc)},
{from_clientid, string, <<"From ClientId">>},
{from_username, string, <<"From Username">>}
]).
paths() -> ["/mqtt/delayed", "/mqtt/delayed/messages", "/mqtt/delayed/messages/:msgid"].
parameters() ->
[#{
name => msgid,
in => path,
schema => #{type => string},
required => true
}].
status_api() ->
Metadata = #{
schema("/mqtt/delayed") ->
#{
operationId => status,
get => #{
tags => [<<"mqtt">>],
description => <<"Get delayed status">>,
summary => <<"Get delayed status">>,
responses => #{
<<"200">> => schema(conf_schema())}
},
200 => ref(emqx_modules_schema, "delayed")
}
},
put => #{
tags => [<<"mqtt">>],
description => <<"Enable or disable delayed, set max delayed messages">>,
'requestBody' => schema(conf_schema()),
requestBody => ref(emqx_modules_schema, "delayed"),
responses => #{
<<"200">> =>
schema(conf_schema(), <<"Enable or disable delayed successfully">>),
<<"400">> =>
error_schema(<<"Max limit illegality">>, [?BAD_REQUEST])
200 => mk(ref(emqx_modules_schema, "delayed"),
#{desc => <<"Enable or disable delayed successfully">>}),
400 => emqx_swagger:error_codes([?BAD_REQUEST], <<"Max limit illegality">>)
}
}
},
{"/mqtt/delayed", Metadata, status}.
};
delayed_messages_api() ->
Metadata = #{
get => #{
description => "List delayed messages",
parameters => page_params(),
responses => #{
<<"200">> => page_object_schema(properties())
}
}
},
{"/mqtt/delayed/messages", Metadata, delayed_messages}.
delayed_message_api() ->
Metadata = #{
schema("/mqtt/delayed/messages/:msgid") ->
#{operationId => delayed_messages,
get => #{
tags => [<<"mqtt">>],
description => <<"Get delayed message">>,
parameters => parameters(),
parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}],
responses => #{
<<"400">> => error_schema(<<"Message ID Schema error">>, [?MESSAGE_ID_SCHEMA_ERROR]),
<<"404">> => error_schema(<<"Message ID not found">>, [?MESSAGE_ID_NOT_FOUND]),
<<"200">> => object_schema(maps:without([payload], properties()), <<"Get delayed message success">>)
200 => ref("message_without_payload"),
400 => emqx_swagger:error_codes([?MESSAGE_ID_SCHEMA_ERROR], <<"Bad MsgId format">>),
404 => emqx_swagger:error_codes([?MESSAGE_ID_NOT_FOUND], <<"MsgId not found">>)
}
},
delete => #{
tags => [<<"mqtt">>],
description => <<"Delete delayed message">>,
parameters => parameters(),
parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}],
responses => #{
<<"400">> => error_schema(<<"Message ID Schema error">>, [?MESSAGE_ID_SCHEMA_ERROR]),
<<"404">> => error_schema(<<"Message ID not found">>, [?MESSAGE_ID_NOT_FOUND]),
<<"200">> => schema(<<"Delete delayed message success">>)
200 => <<"Delete delayed message success">>,
400 => emqx_swagger:error_codes([?MESSAGE_ID_SCHEMA_ERROR], <<"Bad MsgId format">>),
404 => emqx_swagger:error_codes([?MESSAGE_ID_NOT_FOUND], <<"MsgId not found">>)
}
}
},
{"/mqtt/delayed/messages/:msgid", Metadata, delayed_message}.
};
schema("/mqtt/delayed/messages") ->
#{
operationId => delayed_messages,
get => #{
tags => [<<"mqtt">>],
description => <<"List delayed messages">>,
parameters => [ref(emqx_swagger, page), ref(emqx_swagger, limit)],
responses => #{
200 =>
[
{data, mk(hoconsc:array(ref("message")), #{})},
{meta, [
{page, mk(integer(), #{})},
{limit, mk(integer(), #{})},
{count, mk(integer(), #{})}
]}
]
}
}
}.
fields("message_without_payload") ->
[
{msgid, mk(integer(), #{desc => <<"Message Id (MQTT message id hash)">>})},
{publish_at, mk(binary(), #{desc => <<"Client publish message time, rfc 3339">>})},
{delayed_interval, mk(integer(), #{desc => <<"Delayed interval, second">>})},
{delayed_remaining, mk(integer(), #{desc => <<"Delayed remaining, second">>})},
{expected_at, mk(binary(), #{desc => <<"Expect publish time, rfc 3339">>})},
{topic, mk(binary(), #{desc => <<"Topic">>, example => <<"/sys/#">>})},
{qos, mk(binary(), #{desc => <<"QoS">>})},
{from_clientid, mk(binary(), #{desc => <<"From ClientId">>})},
{from_username, mk(binary(), #{desc => <<"From Username">>})}
];
fields("message") ->
PayloadDesc = io_lib:format("Payload, base64 encode. Payload will be ~p if length large than ~p",
[?PAYLOAD_TOO_LARGE, ?MAX_PAYLOAD_LENGTH]),
fields("message_without_payload") ++
[{payload, mk(binary(), #{desc => iolist_to_binary(PayloadDesc)})}].
%%--------------------------------------------------------------------
%% HTTP API
@ -210,7 +210,7 @@ generate_max_delayed_messages(Config) ->
update_config_(Config) ->
lists:foreach(fun(Node) ->
update_config_(Node, Config)
end, ekka_mnesia:running_nodes()).
end, ekka_mnesia:running_nodes()).
update_config_(Node, Config) when Node =:= node() ->
_ = emqx_delayed:update_config(Config),

View File

@ -52,7 +52,7 @@
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.4"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.5"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
, {replayq, "0.3.3"}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}