Merge pull request #5784 from zhongwencool/delayed-api-schema

feat(delayed_api): support hocon schema
This commit is contained in:
zhongwencool 2021-09-27 13:45:17 +08:00 committed by GitHub
commit 5280f83651
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 335 additions and 172 deletions

View File

@ -159,11 +159,11 @@ fields("stats") ->
fields("authorization") ->
[ {"no_match",
sc(hoconsc:union([allow, deny]),
sc(hoconsc:enum([allow, deny]),
#{ default => allow
})}
, {"deny_action",
sc(hoconsc:union([ignore, disconnect]),
sc(hoconsc:enum([ignore, disconnect]),
#{ default => ignore
})}
, {"cache",
@ -297,7 +297,7 @@ fields("mqtt") ->
})
}
, {"mqueue_default_priority",
sc(union(highest, lowest),
sc(hoconsc:enum([highest, lowest]),
#{ default => lowest
})
}
@ -312,11 +312,11 @@ fields("mqtt") ->
})
}
, {"peer_cert_as_username",
sc(hoconsc:union([disabled, cn, dn, crt, pem, md5]),
sc(hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
#{ default => disabled
})}
, {"peer_cert_as_clientid",
sc(hoconsc:union([disabled, cn, dn, crt, pem, md5]),
sc(hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
#{ default => disabled
})}
];
@ -525,7 +525,7 @@ fields("ws_opts") ->
})
}
, {"mqtt_piggyback",
sc(hoconsc:union([single, multiple]),
sc(hoconsc:enum([single, multiple]),
#{ default => multiple
})
}
@ -653,7 +653,7 @@ fields(ssl_client_opts) ->
fields("deflate_opts") ->
[ {"level",
sc(hoconsc:union([none, default, best_compression, best_speed]),
sc(hoconsc:enum([none, default, best_compression, best_speed]),
#{})
}
, {"mem_level",
@ -662,15 +662,15 @@ fields("deflate_opts") ->
})
}
, {"strategy",
sc(hoconsc:union([default, filtered, huffman_only, rle]),
sc(hoconsc:enum([default, filtered, huffman_only, rle]),
#{})
}
, {"server_context_takeover",
sc(hoconsc:union([takeover, no_takeover]),
sc(hoconsc:enum([takeover, no_takeover]),
#{})
}
, {"client_context_takeover",
sc(hoconsc:union([takeover, no_takeover]),
sc(hoconsc:enum([takeover, no_takeover]),
#{})
}
, {"server_max_window_bits",
@ -709,12 +709,12 @@ fields("broker") ->
})
}
, {"session_locking_strategy",
sc(hoconsc:union([local, leader, quorum, all]),
sc(hoconsc:enum([local, leader, quorum, all]),
#{ default => quorum
})
}
, {"shared_subscription_strategy",
sc(hoconsc:union([random, round_robin]),
sc(hoconsc:enum([random, round_robin]),
#{ default => round_robin
})
}
@ -736,7 +736,7 @@ fields("broker") ->
fields("broker_perf") ->
[ {"route_lock_type",
sc(hoconsc:union([key, tab, global]),
sc(hoconsc:enum([key, tab, global]),
#{ default => key
})}
, {"trie_compaction",
@ -962,7 +962,7 @@ the file if it is to be added.
})
}
, {"verify",
sc(hoconsc:union([verify_peer, verify_none]),
sc(hoconsc:enum([verify_peer, verify_none]),
#{ default => Df("verify", verify_none)
})
}

View File

@ -139,7 +139,10 @@ update_pwd(Username, Fun) ->
-spec(lookup_user(binary()) -> [mqtt_admin()]).
lookup_user(Username) when is_binary(Username) -> mnesia:dirty_read(mqtt_admin, Username).
lookup_user(Username) when is_binary(Username) ->
Fun = fun() -> mnesia:read(mqtt_admin, Username) end,
{atomic, User} = ekka_mnesia:ro_transaction(?DASHBOARD_SHARD, Fun),
User.
-spec(all_users() -> [#mqtt_admin{}]).
all_users() -> ets:tab2list(mqtt_admin).

View File

@ -162,7 +162,8 @@ flush({Connection, Route, Subscription}, {Received0, Sent0, Dropped0}) ->
diff(Sent, Sent0),
diff(Dropped, Dropped0)},
Ts = get_local_time(),
_ = mnesia:dirty_write(emqx_collect, #mqtt_collect{timestamp = Ts, collect = Collect}),
ekka_mnesia:transaction(ekka_mnesia:local_content_shard(),
fun mnesia:write/1, [#mqtt_collect{timestamp = Ts, collect = Collect}]),
{Received, Sent, Dropped}.
avg(Items) ->

View File

@ -6,6 +6,11 @@
%% API
-export([spec/1, spec/2]).
-export([translate_req/2]).
-export([namespace/0, fields/1]).
-export([error_codes/1, error_codes/2]).
-define(MAX_ROW_LIMIT, 100).
%% API
-ifdef(TEST).
-compile(export_all).
@ -22,7 +27,8 @@
-define(INIT_SCHEMA, #{fields => #{}, translations => #{}, validations => [], namespace => undefined}).
-define(TO_REF(_N_, _F_), iolist_to_binary([to_bin(_N_), ".", to_bin(_F_)])).
-define(TO_COMPONENTS(_M_, _F_), iolist_to_binary([<<"#/components/schemas/">>, ?TO_REF(namespace(_M_), _F_)])).
-define(TO_COMPONENTS_SCHEMA(_M_, _F_), iolist_to_binary([<<"#/components/schemas/">>, ?TO_REF(namespace(_M_), _F_)])).
-define(TO_COMPONENTS_PARAM(_M_, _F_), iolist_to_binary([<<"#/components/parameters/">>, ?TO_REF(namespace(_M_), _F_)])).
%% @equiv spec(Module, #{check_schema => false})
-spec(spec(module()) ->
@ -54,7 +60,6 @@ spec(Module, Options) ->
end, {[], []}, Paths),
{ApiSpec, components(lists:usort(AllRefs))}.
-spec(translate_req(#{binding => list(), query_string => list(), body => map()},
#{module => module(), path => string(), method => atom()}) ->
{ok, #{binding => list(), query_string => list(), body => map()}}|
@ -64,7 +69,7 @@ translate_req(Request, #{module := Module, path := Path, method := Method}) ->
try
Params = maps:get(parameters, Spec, []),
Body = maps:get(requestBody, Spec, []),
{Bindings, QueryStr} = check_parameters(Request, Params),
{Bindings, QueryStr} = check_parameters(Request, Params, Module),
NewBody = check_requestBody(Request, Body, Module, hoconsc:is_schema(Body)),
{ok, Request#{bindings => Bindings, query_string => QueryStr, body => NewBody}}
catch throw:Error ->
@ -73,6 +78,30 @@ translate_req(Request, #{module := Module, path := Path, method := Method}) ->
{400, 'BAD_REQUEST', iolist_to_binary(io_lib:format("~s : ~p", [Key, Reason]))}
end.
namespace() -> "public".
fields(page) ->
Desc = <<"Page number of the results to fetch.">>,
Meta = #{in => query, desc => Desc, default => 1, example => 1},
[{page, hoconsc:mk(integer(), Meta)}];
fields(limit) ->
Desc = iolist_to_binary([<<"Results per page(max ">>,
integer_to_binary(?MAX_ROW_LIMIT), <<")">>]),
Meta = #{in => query, desc => Desc, default => ?MAX_ROW_LIMIT, example => 50},
[{limit, hoconsc:mk(range(1, ?MAX_ROW_LIMIT), Meta)}].
error_codes(Codes) ->
error_codes(Codes, <<"Error code to troubleshoot problems.">>).
error_codes(Codes = [_ | _], MsgExample) ->
[
{code, hoconsc:mk(hoconsc:enum(Codes))},
{message, hoconsc:mk(string(), #{
desc => <<"Details description of the error.">>,
example => MsgExample
})}
].
support_check_schema(#{check_schema := true}) -> ?DEFAULT_FILTER;
support_check_schema(#{check_schema := Func})when is_function(Func, 2) -> #{filter => Func};
support_check_schema(_) -> #{filter => undefined}.
@ -93,23 +122,28 @@ parse_spec_ref(Module, Path) ->
maps:without([operationId], Schema)),
{maps:get(operationId, Schema), Specs, Refs}.
check_parameters(Request, Spec) ->
check_parameters(Request, Spec, Module) ->
#{bindings := Bindings, query_string := QueryStr} = Request,
BindingsBin = maps:fold(fun(Key, Value, Acc) -> Acc#{atom_to_binary(Key) => Value} end, #{}, Bindings),
check_parameter(Spec, BindingsBin, QueryStr, #{}, #{}).
check_parameter(Spec, BindingsBin, QueryStr, Module, #{}, #{}).
check_parameter([], _Bindings, _QueryStr, NewBindings, NewQueryStr) -> {NewBindings, NewQueryStr};
check_parameter([{Name, Type} | Spec], Bindings, QueryStr, BindingsAcc, QueryStrAcc) ->
check_parameter([?REF(Fields) | Spec], Bindings, QueryStr, LocalMod, BindingsAcc, QueryStrAcc) ->
check_parameter([?R_REF(LocalMod, Fields) | Spec], Bindings, QueryStr, LocalMod, BindingsAcc, QueryStrAcc);
check_parameter([?R_REF(Module, Fields) | Spec], Bindings, QueryStr, LocalMod, BindingsAcc, QueryStrAcc) ->
Params = apply(Module, fields, [Fields]),
check_parameter(Params ++ Spec, Bindings, QueryStr, LocalMod, BindingsAcc, QueryStrAcc);
check_parameter([], _Bindings, _QueryStr, _Module, NewBindings, NewQueryStr) -> {NewBindings, NewQueryStr};
check_parameter([{Name, Type} | Spec], Bindings, QueryStr, Module, BindingsAcc, QueryStrAcc) ->
Schema = ?INIT_SCHEMA#{roots => [{Name, Type}]},
case hocon_schema:field_schema(Type, in) of
path ->
NewBindings = hocon_schema:check_plain(Schema, Bindings, #{atom_key => true, override_env => false}),
NewBindingsAcc = maps:merge(BindingsAcc, NewBindings),
check_parameter(Spec, Bindings, QueryStr, NewBindingsAcc, QueryStrAcc);
check_parameter(Spec, Bindings, QueryStr, Module, NewBindingsAcc, QueryStrAcc);
query ->
NewQueryStr = hocon_schema:check_plain(Schema, QueryStr, #{override_env => false}),
NewQueryStrAcc = maps:merge(QueryStrAcc, NewQueryStr),
check_parameter(Spec, Bindings, QueryStr, BindingsAcc, NewQueryStrAcc)
check_parameter(Spec, Bindings, QueryStr, Module, BindingsAcc, NewQueryStrAcc)
end.
check_requestBody(#{body := Body}, Schema, Module, true) ->
@ -154,19 +188,28 @@ to_spec(Meta, Params, RequestBody, Responses) ->
parameters(Params, Module) ->
{SpecList, AllRefs} =
lists:foldl(fun({Name, Type}, {Acc, RefsAcc}) ->
In = hocon_schema:field_schema(Type, in),
In =:= undefined andalso throw({error, <<"missing in:path/query field in parameters">>}),
Nullable = hocon_schema:field_schema(Type, nullable),
Default = hocon_schema:field_schema(Type, default),
HoconType = hocon_schema:field_schema(Type, type),
Meta = init_meta(Nullable, Default),
{ParamType, Refs} = hocon_schema_to_spec(HoconType, Module),
Spec0 = init_prop([required | ?DEFAULT_FIELDS],
#{schema => maps:merge(ParamType, Meta), name => Name, in => In}, Type),
Spec1 = trans_required(Spec0, Nullable, In),
Spec2 = trans_desc(Spec1, Type),
{[Spec2 | Acc], Refs ++ RefsAcc}
lists:foldl(fun(Param, {Acc, RefsAcc}) ->
case Param of
?REF(StructName) ->
{[#{<<"$ref">> => ?TO_COMPONENTS_PARAM(Module, StructName)} |Acc],
[{Module, StructName, parameter}|RefsAcc]};
?R_REF(RModule, StructName) ->
{[#{<<"$ref">> => ?TO_COMPONENTS_PARAM(RModule, StructName)} |Acc],
[{RModule, StructName, parameter}|RefsAcc]};
{Name, Type} ->
In = hocon_schema:field_schema(Type, in),
In =:= undefined andalso throw({error, <<"missing in:path/query field in parameters">>}),
Nullable = hocon_schema:field_schema(Type, nullable),
Default = hocon_schema:field_schema(Type, default),
HoconType = hocon_schema:field_schema(Type, type),
Meta = init_meta(Nullable, Default),
{ParamType, Refs} = hocon_schema_to_spec(HoconType, Module),
Spec0 = init_prop([required | ?DEFAULT_FIELDS],
#{schema => maps:merge(ParamType, Meta), name => Name, in => In}, Type),
Spec1 = trans_required(Spec0, Nullable, In),
Spec2 = trans_desc(Spec1, Type),
{[Spec2 | Acc], Refs ++ RefsAcc}
end
end, {[], []}, Params),
{lists:reverse(SpecList), AllRefs}.
@ -196,7 +239,7 @@ trans_required(Spec, _, _) -> Spec.
trans_desc(Spec, Hocon) ->
case hocon_schema:field_schema(Hocon, desc) of
undefined -> Spec;
Desc -> Spec#{description => Desc}
Desc -> Spec#{description => to_bin(Desc)}
end.
requestBody([], _Module) -> {[], []};
@ -248,6 +291,13 @@ components([{Module, Field} | Refs], SpecAcc, SubRefsAcc) ->
Namespace = namespace(Module),
{Object, SubRefs} = parse_object(Props, Module),
NewSpecAcc = SpecAcc#{?TO_REF(Namespace, Field) => Object},
components(Refs, NewSpecAcc, SubRefs ++ SubRefsAcc);
%% parameters in ref only have one value, not array
components([{Module, Field, parameter} | Refs], SpecAcc, SubRefsAcc) ->
Props = apply(Module, fields, [Field]),
{[Param], SubRefs} = parameters(Props, Module),
Namespace = namespace(Module),
NewSpecAcc = SpecAcc#{?TO_REF(Namespace, Field) => Param},
components(Refs, NewSpecAcc, SubRefs ++ SubRefsAcc).
namespace(Module) ->
@ -257,10 +307,10 @@ namespace(Module) ->
end.
hocon_schema_to_spec(?R_REF(Module, StructName), _LocalModule) ->
{#{<<"$ref">> => ?TO_COMPONENTS(Module, StructName)},
{#{<<"$ref">> => ?TO_COMPONENTS_SCHEMA(Module, StructName)},
[{Module, StructName}]};
hocon_schema_to_spec(?REF(StructName), LocalModule) ->
{#{<<"$ref">> => ?TO_COMPONENTS(LocalModule, StructName)},
{#{<<"$ref">> => ?TO_COMPONENTS_SCHEMA(LocalModule, StructName)},
[{LocalModule, StructName}]};
hocon_schema_to_spec(Type, _LocalModule) when ?IS_TYPEREFL(Type) ->
{typename_to_spec(typerefl:name(Type)), []};

View File

@ -103,7 +103,8 @@ do_sign(Username, Password) ->
},
Signed = jose_jwt:sign(JWK, JWS, JWT),
{_, Token} = jose_jws:compact(Signed),
ok = ekka_mnesia:dirty_write(format(Token, Username, ExpTime)),
JWTRec = format(Token, Username, ExpTime),
ekka_mnesia:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [JWTRec]),
{ok, Token}.
do_verify(Token)->
@ -111,8 +112,9 @@ do_verify(Token)->
{ok, JWT = #mqtt_admin_jwt{exptime = ExpTime}} ->
case ExpTime > erlang:system_time(millisecond) of
true ->
ekka_mnesia:dirty_write(JWT#mqtt_admin_jwt{exptime = jwt_expiration_time()}),
ok;
NewJWT = JWT#mqtt_admin_jwt{exptime = jwt_expiration_time()},
{atomic, Res} = ekka_mnesia:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [NewJWT]),
Res;
_ ->
{error, token_timeout}
end;
@ -132,14 +134,18 @@ do_destroy_by_username(Username) ->
%% jwt internal util function
-spec(lookup(Token :: binary()) -> {ok, #mqtt_admin_jwt{}} | {error, not_found}).
lookup(Token) ->
case mnesia:dirty_read(?TAB, Token) of
[JWT] -> {ok, JWT};
[] -> {error, not_found}
Fun = fun() -> mnesia:read(?TAB, Token) end,
case ekka_mnesia:ro_transaction(?DASHBOARD_SHARD, Fun) of
{atomic, [JWT]} -> {ok, JWT};
{atomic, []} -> {error, not_found}
end.
lookup_by_username(Username) ->
Spec = [{{mqtt_admin_jwt, '_', Username, '_'}, [], ['$_']}],
mnesia:dirty_select(?TAB, Spec).
Fun = fun() -> mnesia:select(?TAB, Spec) end,
{atomic, List} = ekka_mnesia:ro_transaction(?DASHBOARD_SHARD, Fun),
List.
jwk(Username, Password, Salt) ->
Key = erlang:md5(<<Salt/binary, Username/binary, Password/binary>>),
@ -187,7 +193,8 @@ handle_info(clean_jwt, State) ->
timer_clean(self()),
Now = erlang:system_time(millisecond),
Spec = [{{mqtt_admin_jwt, '_', '_', '$1'}, [{'<', '$1', Now}], ['$_']}],
JWTList = mnesia:dirty_select(?TAB, Spec),
{atomic, JWTList} = ekka_mnesia:ro_transaction(?DASHBOARD_SHARD,
fun() -> mnesia:select(?TAB, Spec) end),
destroy(JWTList),
{noreply, State};
handle_info(_Info, State) ->

View File

@ -1,13 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author zhongwen
%%% @copyright (C) 2021, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 22. 9 2021 13:38
%%%-------------------------------------------------------------------
-module(emqx_swagger_util).
-author("zhongwen").
%% API
-export([]).

View File

@ -3,10 +3,10 @@
-behaviour(hocon_schema).
%% API
-export([paths/0, api_spec/0, schema/1]).
-export([t_in_path/1, t_in_query/1, t_in_mix/1, t_without_in/1]).
-export([paths/0, api_spec/0, schema/1, fields/1]).
-export([t_in_path/1, t_in_query/1, t_in_mix/1, t_without_in/1, t_ref/1, t_public_ref/1]).
-export([t_require/1, t_nullable/1, t_method/1, t_api_spec/1]).
-export([t_in_path_trans/1, t_in_query_trans/1, t_in_mix_trans/1]).
-export([t_in_path_trans/1, t_in_query_trans/1, t_in_mix_trans/1, t_ref_trans/1]).
-export([t_in_path_trans_error/1, t_in_query_trans_error/1, t_in_mix_trans_error/1]).
-export([all/0, suite/0, groups/0]).
@ -20,9 +20,9 @@
all() -> [{group, spec}, {group, validation}].
suite() -> [{timetrap, {minutes, 1}}].
groups() -> [
{spec, [parallel], [t_api_spec, t_in_path, t_in_query, t_in_mix,
t_without_in, t_require, t_nullable, t_method]},
{validation, [parallel], [t_in_path_trans, t_in_query_trans, t_in_mix_trans,
{spec, [parallel], [t_api_spec, t_in_path, t_ref, t_in_query, t_in_mix,
t_without_in, t_require, t_nullable, t_method, t_public_ref]},
{validation, [parallel], [t_in_path_trans, t_ref_trans, t_in_query_trans, t_in_mix_trans,
t_in_path_trans_error, t_in_query_trans_error, t_in_mix_trans_error]}
].
@ -44,6 +44,41 @@ t_in_query(_Config) ->
validate("/test/in/query", Expect),
ok.
t_ref(_Config) ->
LocalPath = "/test/in/ref/local",
Path = "/test/in/ref",
Expect = [#{<<"$ref">> => <<"#/components/parameters/emqx_swagger_parameter_SUITE.page">>}],
{OperationId, Spec, Refs} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path),
{OperationId, Spec, Refs} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, LocalPath),
?assertEqual(test, OperationId),
Params = maps:get(parameters, maps:get(post, Spec)),
?assertEqual(Expect, Params),
?assertEqual([{?MODULE, page, parameter}], Refs),
ok.
t_public_ref(_Config) ->
Path = "/test/in/ref/public",
Expect = [
#{<<"$ref">> => <<"#/components/parameters/public.page">>},
#{<<"$ref">> => <<"#/components/parameters/public.limit">>}
],
{OperationId, Spec, Refs} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path),
?assertEqual(test, OperationId),
Params = maps:get(parameters, maps:get(post, Spec)),
?assertEqual(Expect, Params),
?assertEqual([
{emqx_dashboard_swagger, limit, parameter},
{emqx_dashboard_swagger, page, parameter}
], Refs),
ExpectRefs = [
#{<<"public.limit">> => #{description => <<"Results per page(max 100)">>, example => 50,in => query,name => limit,
schema => #{default => 100,example => 1,maximum => 100, minimum => 1,type => integer}}},
#{<<"public.page">> => #{description => <<"Page number of the results to fetch.">>,
example => 1,in => query,name => page,
schema => #{default => 1,example => 100,type => integer}}}],
?assertEqual(ExpectRefs, emqx_dashboard_swagger:components(Refs)),
ok.
t_in_mix(_Config) ->
Expect =
[#{description => <<"Indicates which sorts of issues to return">>,
@ -115,6 +150,18 @@ t_in_query_trans(_Config) ->
?assertEqual(Expect, trans_parameters(Path, #{}, #{<<"per_page">> => 100})),
ok.
t_ref_trans(_Config) ->
LocalPath = "/test/in/ref/local",
Path = "/test/in/ref",
Expect = {ok, #{bindings => #{},body => #{},
query_string => #{<<"per_page">> => 100}}},
?assertEqual(Expect, trans_parameters(Path, #{}, #{<<"per_page">> => 100})),
?assertEqual(Expect, trans_parameters(LocalPath, #{}, #{<<"per_page">> => 100})),
{400,'BAD_REQUEST', Reason} = trans_parameters(Path, #{}, #{<<"per_page">> => 1010}),
?assertNotEqual(nomatch, binary:match(Reason, [<<"per_page">>])),
{400,'BAD_REQUEST', Reason} = trans_parameters(LocalPath, #{}, #{<<"per_page">> => 1010}),
ok.
t_in_mix_trans(_Config) ->
Path = "/test/in/mix/:state",
Bindings = #{
@ -186,7 +233,7 @@ trans_parameters(Path, Bindings, QueryStr) ->
api_spec() -> emqx_dashboard_swagger:spec(?MODULE).
paths() -> ["/test/in/:filter", "/test/in/query", "/test/in/mix/:state",
paths() -> ["/test/in/:filter", "/test/in/query", "/test/in/mix/:state", "/test/in/ref",
"/required/false", "/nullable/false", "/nullable/true", "/method/ok"].
schema("/test/in/:filter") ->
@ -213,6 +260,33 @@ schema("/test/in/query") ->
responses => #{200 => <<"ok">>}
}
};
schema("/test/in/ref/local") ->
#{
operationId => test,
post => #{
parameters => [hoconsc:ref(page)],
responses => #{200 => <<"ok">>}
}
};
schema("/test/in/ref") ->
#{
operationId => test,
post => #{
parameters => [hoconsc:ref(?MODULE, page)],
responses => #{200 => <<"ok">>}
}
};
schema("/test/in/ref/public") ->
#{
operationId => test,
post => #{
parameters => [
hoconsc:ref(emqx_dashboard_swagger, page),
hoconsc:ref(emqx_dashboard_swagger, limit)
],
responses => #{200 => <<"ok">>}
}
};
schema("/test/in/mix/:state") ->
#{
operationId => test,
@ -257,6 +331,13 @@ schema("/method/ok") ->
#{operationId => test}, ?METHODS);
schema("/method/error") ->
#{operationId => test, bar => #{200 => <<"ok">>}}.
fields(page) ->
[
{per_page,
mk(range(1, 100),
#{in => query, desc => <<"results per page (max 100)">>, example => 1})}
].
to_schema(Params) ->
#{
operationId => test,

View File

@ -101,7 +101,7 @@ t_remote_ref(_Config) ->
{<<"another_ref">>, #{<<"$ref">> => <<"#/components/schemas/emqx_swagger_remote_schema.ref3">>}}], <<"type">> => object}},
#{<<"emqx_swagger_remote_schema.ref3">> => #{<<"properties">> => [
{<<"ip">>, #{description => <<"IP:Port">>, example => <<"127.0.0.1:80">>,type => string}},
{<<"version">>, #{description => "a good version", example => <<"1.0.0">>,type => string}}],
{<<"version">>, #{description => <<"a good version">>, example => <<"1.0.0">>,type => string}}],
<<"type">> => object}}],
?assertEqual(ExpectComponents, Components),
ok.
@ -116,7 +116,7 @@ t_nest_ref(_Config) ->
ExpectComponents = lists:sort([
#{<<"emqx_swagger_requestBody_SUITE.nest_ref">> => #{<<"properties">> => [
{<<"env">>, #{enum => [test,dev,prod],type => string}},
{<<"another_ref">>, #{description => "nest ref", <<"$ref">> => <<"#/components/schemas/emqx_swagger_requestBody_SUITE.good_ref">>}}],
{<<"another_ref">>, #{description => <<"nest ref">>, <<"$ref">> => <<"#/components/schemas/emqx_swagger_requestBody_SUITE.good_ref">>}}],
<<"type">> => object}},
#{<<"emqx_swagger_requestBody_SUITE.good_ref">> => #{<<"properties">> => [
{<<"webhook-host">>, #{default => <<"127.0.0.1:80">>, example => <<"127.0.0.1:80">>,type => string}},

View File

@ -12,7 +12,7 @@
-export([all/0, suite/0, groups/0]).
-export([paths/0, api_spec/0, schema/1, fields/1]).
-export([t_simple_binary/1, t_object/1, t_nest_object/1, t_empty/1,
-export([t_simple_binary/1, t_object/1, t_nest_object/1, t_empty/1, t_error/1,
t_raw_local_ref/1, t_raw_remote_ref/1, t_hocon_schema_function/1,
t_local_ref/1, t_remote_ref/1, t_bad_ref/1, t_none_ref/1, t_nest_ref/1,
t_ref_array_with_key/1, t_ref_array_without_key/1, t_api_spec/1]).
@ -21,7 +21,7 @@ all() -> [{group, spec}].
suite() -> [{timetrap, {minutes, 1}}].
groups() -> [
{spec, [parallel], [
t_api_spec, t_simple_binary, t_object, t_nest_object,
t_api_spec, t_simple_binary, t_object, t_nest_object, t_error,
t_raw_local_ref, t_raw_remote_ref, t_empty, t_hocon_schema_function,
t_local_ref, t_remote_ref, t_bad_ref, t_none_ref,
t_ref_array_with_key, t_ref_array_without_key, t_nest_ref]}
@ -48,6 +48,33 @@ t_object(_config) ->
validate(Path, Object, ExpectRefs),
ok.
t_error(_Config) ->
Path = "/error",
Error400 = #{<<"content">> =>
#{<<"application/json">> => #{<<"schema">> => #{<<"type">> => object,
<<"properties">> =>
[
{<<"code">>, #{enum => ['Bad1','Bad2'], type => string}},
{<<"message">>, #{description => <<"Details description of the error.">>,
example => <<"Bad request desc">>, type => string}}]
}}}},
Error404 = #{<<"content">> =>
#{<<"application/json">> => #{<<"schema">> => #{<<"type">> => object,
<<"properties">> =>
[
{<<"code">>, #{enum => ['Not-Found'], type => string}},
{<<"message">>, #{description => <<"Details description of the error.">>,
example => <<"Error code to troubleshoot problems.">>, type => string}}]
}}}},
{OperationId, Spec, Refs} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path),
?assertEqual(test, OperationId),
Response = maps:get(responses, maps:get(get, Spec)),
?assertEqual(Error400, maps:get(<<"400">>, Response)),
?assertEqual(Error404, maps:get(<<"404">>, Response)),
?assertEqual(#{}, maps:without([<<"400">>, <<"404">>], Response)),
?assertEqual([], Refs),
ok.
t_nest_object(_Config) ->
Path = "/nest/object",
Object =
@ -175,7 +202,7 @@ t_hocon_schema_function(_Config) ->
#{<<"emqx_swagger_remote_schema.ref3">> => #{<<"type">> => object,
<<"properties">> => [
{<<"ip">>, #{description => <<"IP:Port">>, example => <<"127.0.0.1:80">>,type => string}},
{<<"version">>, #{description => "a good version", example => <<"1.0.0">>, type => string}}]
{<<"version">>, #{description => <<"a good version">>, example => <<"1.0.0">>, type => string}}]
}},
#{<<"emqx_swagger_remote_schema.root">> => #{required => [<<"default_password">>, <<"default_username">>],
<<"properties">> => [{<<"listeners">>, #{items =>
@ -255,7 +282,15 @@ schema("/ref/array/with/key") ->
schema("/ref/array/without/key") ->
to_schema(mk(hoconsc:array(hoconsc:ref(?MODULE, good_ref)), #{}));
schema("/ref/hocon/schema/function") ->
to_schema(mk(hoconsc:ref(emqx_swagger_remote_schema, "root"), #{})).
to_schema(mk(hoconsc:ref(emqx_swagger_remote_schema, "root"), #{}));
schema("/error") ->
#{
operationId => test,
get => #{responses => #{
400 => emqx_dashboard_swagger:error_codes(['Bad1', 'Bad2'], <<"Bad request desc">>),
404 => emqx_dashboard_swagger:error_codes(['Not-Found'])
}}
}.
validate(Path, ExpectObject, ExpectRefs) ->
{OperationId, Spec, Refs} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path),

View File

@ -102,7 +102,7 @@ fields("cluster") ->
, default => emqxcl
})}
, {"discovery_strategy",
sc(union([manual, static, mcast, dns, etcd, k8s]),
sc(hoconsc:enum([manual, static, mcast, dns, etcd, k8s]),
#{ default => manual
})}
, {"autoclean",
@ -122,7 +122,7 @@ fields("cluster") ->
sc(ref(cluster_mcast),
#{})}
, {"proto_dist",
sc(union([inet_tcp, inet6_tcp, inet_tls]),
sc(hoconsc:enum([inet_tcp, inet6_tcp, inet_tls]),
#{ mapping => "ekka.proto_dist"
, default => inet_tcp
})}
@ -136,7 +136,7 @@ fields("cluster") ->
sc(ref(cluster_k8s),
#{})}
, {"db_backend",
sc(union([mnesia, rlog]),
sc(hoconsc:enum([mnesia, rlog]),
#{ mapping => "ekka.db_backend"
, default => mnesia
})}
@ -224,7 +224,7 @@ fields(cluster_k8s) ->
#{ default => "emqx"
})}
, {"address_type",
sc(union([ip, dns, hostname]),
sc(hoconsc:enum([ip, dns, hostname]),
#{})}
, {"app_name",
sc(string(),
@ -242,7 +242,7 @@ fields(cluster_k8s) ->
fields("rlog") ->
[ {"role",
sc(union([core, replicant]),
sc(hoconsc:enum([core, replicant]),
#{ mapping => "ekka.node_role"
, default => core
})}
@ -334,7 +334,7 @@ fields("cluster_call") ->
fields("rpc") ->
[ {"mode",
sc(union(sync, async),
sc(hoconsc:enum([sync, async]),
#{ default => async
})}
, {"async_batch_size",
@ -343,7 +343,7 @@ fields("rpc") ->
, default => 256
})}
, {"port_discovery",
sc(union(manual, stateless),
sc(hoconsc:enum([manual, stateless]),
#{ mapping => "gen_rpc.port_discovery"
, default => stateless
})}
@ -434,7 +434,7 @@ fields("log_file_handler") ->
sc(ref("log_rotation"),
#{})}
, {"max_size",
sc(union([infinity, emqx_schema:bytesize()]),
sc(hoconsc:union([infinity, emqx_schema:bytesize()]),
#{ default => "10MB"
})}
] ++ log_handler_common_confs();
@ -464,7 +464,7 @@ fields("log_overload_kill") ->
#{ default => 20000
})}
, {"restart_after",
sc(union(emqx_schema:duration(), infinity),
sc(hoconsc:union([emqx_schema:duration(), infinity]),
#{ default => "5s"
})}
];
@ -582,7 +582,7 @@ log_handler_common_confs() ->
#{ default => unlimited
})}
, {"formatter",
sc(union([text, json]),
sc(hoconsc:enum([text, json]),
#{ default => text
})}
, {"single_line",
@ -608,11 +608,11 @@ log_handler_common_confs() ->
sc(ref("log_burst_limit"),
#{})}
, {"supervisor_reports",
sc(union([error, progress]),
sc(hoconsc:enum([error, progress]),
#{ default => error
})}
, {"max_depth",
sc(union([unlimited, integer()]),
sc(hoconsc:union([unlimited, integer()]),
#{ default => 100
})}
].

View File

@ -124,7 +124,7 @@ t_catch_up_status_handle_next_commit(_Config) ->
t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
Now = erlang:system_time(second),
Now = erlang:system_time(millisecond),
{M, F, A} = {?MODULE, failed_on_other_recover_after_5_second, [erlang:whereis(?NODE1), Now]},
{ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
{ok, _, ok} = emqx_cluster_rpc:multicall(io, format, ["test"], 1, 1000),
@ -132,10 +132,10 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
?assertEqual([], L),
?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)),
?assertEqual(node(), maps:get(node, Status)),
sleep(3000),
sleep(2300),
{atomic, [Status1]} = emqx_cluster_rpc:status(),
?assertEqual(Status, Status1),
sleep(2600),
sleep(3600),
{atomic, NewStatus} = emqx_cluster_rpc:status(),
?assertEqual(3, length(NewStatus)),
Pid = self(),
@ -243,11 +243,11 @@ failed_on_node_by_odd(Pid) ->
end.
failed_on_other_recover_after_5_second(Pid, CreatedAt) ->
Now = erlang:system_time(second),
Now = erlang:system_time(millisecond),
case Pid =:= self() of
true -> ok;
false ->
case Now < CreatedAt + 5 of
case Now < CreatedAt + 5001 of
true -> "MFA return not ok";
false -> ok
end

View File

@ -18,22 +18,19 @@
-behavior(minirest_api).
-import(emqx_mgmt_util, [ page_params/0
, schema/1
, schema/2
, object_schema/2
, error_schema/2
, page_object_schema/1
, properties/1
]).
-include_lib("typerefl/include/types.hrl").
-import(hoconsc, [mk/2, ref/1, ref/2]).
-define(MAX_PAYLOAD_LENGTH, 2048).
-define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE').
-export([ status/2
, delayed_messages/2
, delayed_message/2
]).
-export([status/2
, delayed_messages/2
, delayed_message/2
]).
-export([paths/0, fields/1, schema/1]).
%% for rpc
-export([update_config_/1]).
@ -49,91 +46,94 @@
-define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR').
api_spec() ->
{
[status_api(), delayed_messages_api(), delayed_message_api()],
[]
}.
emqx_dashboard_swagger:spec(?MODULE).
conf_schema() ->
emqx_mgmt_api_configs:gen_schema(emqx:get_raw_config([delayed])).
properties() ->
PayloadDesc = io_lib:format("Payload, base64 encode. Payload will be ~p if length large than ~p",
[?PAYLOAD_TOO_LARGE, ?MAX_PAYLOAD_LENGTH]),
properties([
{msgid, integer, <<"Message Id">>},
{publish_at, string, <<"Client publish message time, rfc 3339">>},
{delayed_interval, integer, <<"Delayed interval, second">>},
{delayed_remaining, integer, <<"Delayed remaining, second">>},
{expected_at, string, <<"Expect publish time, rfc 3339">>},
{topic, string, <<"Topic">>},
{qos, string, <<"QoS">>},
{payload, string, iolist_to_binary(PayloadDesc)},
{from_clientid, string, <<"From ClientId">>},
{from_username, string, <<"From Username">>}
]).
paths() -> ["/mqtt/delayed", "/mqtt/delayed/messages", "/mqtt/delayed/messages/:msgid"].
parameters() ->
[#{
name => msgid,
in => path,
schema => #{type => string},
required => true
}].
status_api() ->
Metadata = #{
schema("/mqtt/delayed") ->
#{
operationId => status,
get => #{
tags => [<<"mqtt">>],
description => <<"Get delayed status">>,
summary => <<"Get delayed status">>,
responses => #{
<<"200">> => schema(conf_schema())}
},
200 => ref(emqx_modules_schema, "delayed")
}
},
put => #{
tags => [<<"mqtt">>],
description => <<"Enable or disable delayed, set max delayed messages">>,
'requestBody' => schema(conf_schema()),
requestBody => ref(emqx_modules_schema, "delayed"),
responses => #{
<<"200">> =>
schema(conf_schema(), <<"Enable or disable delayed successfully">>),
<<"400">> =>
error_schema(<<"Max limit illegality">>, [?BAD_REQUEST])
200 => mk(ref(emqx_modules_schema, "delayed"),
#{desc => <<"Enable or disable delayed successfully">>}),
400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Max limit illegality">>)
}
}
},
{"/mqtt/delayed", Metadata, status}.
};
delayed_messages_api() ->
Metadata = #{
get => #{
description => "List delayed messages",
parameters => page_params(),
responses => #{
<<"200">> => page_object_schema(properties())
}
}
},
{"/mqtt/delayed/messages", Metadata, delayed_messages}.
delayed_message_api() ->
Metadata = #{
schema("/mqtt/delayed/messages/:msgid") ->
#{operationId => delayed_message,
get => #{
tags => [<<"mqtt">>],
description => <<"Get delayed message">>,
parameters => parameters(),
parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}],
responses => #{
<<"400">> => error_schema(<<"Message ID Schema error">>, [?MESSAGE_ID_SCHEMA_ERROR]),
<<"404">> => error_schema(<<"Message ID not found">>, [?MESSAGE_ID_NOT_FOUND]),
<<"200">> => object_schema(maps:without([payload], properties()), <<"Get delayed message success">>)
200 => ref("message_without_payload"),
400 => emqx_dashboard_swagger:error_codes([?MESSAGE_ID_SCHEMA_ERROR], <<"Bad MsgId format">>),
404 => emqx_dashboard_swagger:error_codes([?MESSAGE_ID_NOT_FOUND], <<"MsgId not found">>)
}
},
delete => #{
tags => [<<"mqtt">>],
description => <<"Delete delayed message">>,
parameters => parameters(),
parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}],
responses => #{
<<"400">> => error_schema(<<"Message ID Schema error">>, [?MESSAGE_ID_SCHEMA_ERROR]),
<<"404">> => error_schema(<<"Message ID not found">>, [?MESSAGE_ID_NOT_FOUND]),
<<"200">> => schema(<<"Delete delayed message success">>)
200 => <<"Delete delayed message success">>,
400 => emqx_dashboard_swagger:error_codes([?MESSAGE_ID_SCHEMA_ERROR], <<"Bad MsgId format">>),
404 => emqx_dashboard_swagger:error_codes([?MESSAGE_ID_NOT_FOUND], <<"MsgId not found">>)
}
}
},
{"/mqtt/delayed/messages/:msgid", Metadata, delayed_message}.
};
schema("/mqtt/delayed/messages") ->
#{
operationId => delayed_messages,
get => #{
tags => [<<"mqtt">>],
description => <<"List delayed messages">>,
parameters => [ref(emqx_dashboard_swagger, page), ref(emqx_dashboard_swagger, limit)],
responses => #{
200 =>
[
{data, mk(hoconsc:array(ref("message")), #{})},
{meta, [
{page, mk(integer(), #{})},
{limit, mk(integer(), #{})},
{count, mk(integer(), #{})}
]}
]
}
}
}.
fields("message_without_payload") ->
[
{msgid, mk(integer(), #{desc => <<"Message Id (MQTT message id hash)">>})},
{publish_at, mk(binary(), #{desc => <<"Client publish message time, rfc 3339">>})},
{delayed_interval, mk(integer(), #{desc => <<"Delayed interval, second">>})},
{delayed_remaining, mk(integer(), #{desc => <<"Delayed remaining, second">>})},
{expected_at, mk(binary(), #{desc => <<"Expect publish time, rfc 3339">>})},
{topic, mk(binary(), #{desc => <<"Topic">>, example => <<"/sys/#">>})},
{qos, mk(binary(), #{desc => <<"QoS">>})},
{from_clientid, mk(binary(), #{desc => <<"From ClientId">>})},
{from_username, mk(binary(), #{desc => <<"From Username">>})}
];
fields("message") ->
PayloadDesc = io_lib:format("Payload, base64 encode. Payload will be ~p if length large than ~p",
[?PAYLOAD_TOO_LARGE, ?MAX_PAYLOAD_LENGTH]),
fields("message_without_payload") ++
[{payload, mk(binary(), #{desc => iolist_to_binary(PayloadDesc)})}].
%%--------------------------------------------------------------------
%% HTTP API
@ -210,7 +210,7 @@ generate_max_delayed_messages(Config) ->
update_config_(Config) ->
lists:foreach(fun(Node) ->
update_config_(Node, Config)
end, ekka_mnesia:running_nodes()).
end, ekka_mnesia:running_nodes()).
update_config_(Node, Config) when Node =:= node() ->
_ = emqx_delayed:update_config(Config),

View File

@ -18,8 +18,7 @@
%% Check for the mnesia calls forbidden by Ekka:
{xref_queries,
[ {"E || \"mnesia\":\"dirty_write\"/\".*\" : Fun", [{{emqx_dashboard_collection,flush,2},{mnesia,dirty_write,2}}]}
, {"E || \"mnesia\":\"dirty_delete.*\"/\".*\" : Fun", []}
[ {"E || \"mnesia\":\"dirty_delete.*\"/\".*\" : Fun", []}
, {"E || \"mnesia\":\"transaction\"/\".*\" : Fun", []}
, {"E || \"mnesia\":\"async_dirty\"/\".*\" : Fun", []}
, {"E || \"mnesia\":\"clear_table\"/\".*\" : Fun", []}
@ -52,7 +51,7 @@
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.3"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.4"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.5"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
, {replayq, "0.3.3"}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}