Merge pull request #7624 from DDDHuang/fix_api_params

Fix api params
This commit is contained in:
Xinyu Liu 2022-04-18 09:33:22 +08:00 committed by GitHub
commit 21fe7f01ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 130 additions and 166 deletions

View File

@ -24,7 +24,6 @@
{deps, [ {deps, [
{lc, {git, "https://github.com/emqx/lc.git", {tag, "0.2.1"}}}, {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.2.1"}}},
{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}},
{typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.8.6"}}},
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.1"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.1"}}},

View File

@ -149,8 +149,8 @@ fields(response_users) ->
paginated_list_type(ref(response_user)); paginated_list_type(ref(response_user));
fields(pagination_meta) -> fields(pagination_meta) ->
[ [
{page, non_neg_integer()}, {page, pos_integer()},
{limit, non_neg_integer()}, {limit, pos_integer()},
{count, non_neg_integer()} {count, non_neg_integer()}
]. ].
@ -431,8 +431,10 @@ schema("/authentication/:id/users") ->
description => <<"List users in authenticator in global authentication chain">>, description => <<"List users in authenticator in global authentication chain">>,
parameters => [ parameters => [
param_auth_id(), param_auth_id(),
{page, mk(integer(), #{in => query, desc => <<"Page Index">>, required => false})}, {page,
{limit, mk(integer(), #{in => query, desc => <<"Page Limit">>, required => false})}, mk(pos_integer(), #{in => query, desc => <<"Page Index">>, required => false})},
{limit,
mk(pos_integer(), #{in => query, desc => <<"Page Limit">>, required => false})},
{like_username, {like_username,
mk(binary(), #{ mk(binary(), #{
in => query, in => query,
@ -481,8 +483,10 @@ schema("/listeners/:listener_id/authentication/:id/users") ->
parameters => [ parameters => [
param_listener_id(), param_listener_id(),
param_auth_id(), param_auth_id(),
{page, mk(integer(), #{in => query, desc => <<"Page Index">>, required => false})}, {page,
{limit, mk(integer(), #{in => query, desc => <<"Page Limit">>, required => false})} mk(pos_integer(), #{in => query, desc => <<"Page Index">>, required => false})},
{limit,
mk(pos_integer(), #{in => query, desc => <<"Page Limit">>, required => false})}
], ],
responses => #{ responses => #{
200 => emqx_dashboard_swagger:schema_with_example( 200 => emqx_dashboard_swagger:schema_with_example(

View File

@ -95,7 +95,7 @@ For example: `http://localhost:9901/`
, desc => "The type of the pool. Can be one of `random`, `hash`." , desc => "The type of the pool. Can be one of `random`, `hash`."
})} })}
, {pool_size, , {pool_size,
sc(non_neg_integer(), sc(pos_integer(),
#{ default => 8 #{ default => 8
, desc => "The pool size." , desc => "The pool size."
})} })}

View File

@ -76,8 +76,8 @@ fields(sharded) ->
, {w_mode, fun w_mode/1} , {w_mode, fun w_mode/1}
] ++ mongo_fields(); ] ++ mongo_fields();
fields(topology) -> fields(topology) ->
[ {pool_size, fun internal_pool_size/1} [ {pool_size, fun emqx_connector_schema_lib:pool_size/1}
, {max_overflow, fun emqx_connector_schema_lib:pool_size/1} , {max_overflow, fun max_overflow/1}
, {overflow_ttl, fun duration/1} , {overflow_ttl, fun duration/1}
, {overflow_check_period, fun duration/1} , {overflow_check_period, fun duration/1}
, {local_threshold_ms, fun duration/1} , {local_threshold_ms, fun duration/1}
@ -114,12 +114,6 @@ mongo_fields() ->
] ++ ] ++
emqx_connector_schema_lib:ssl_fields(). emqx_connector_schema_lib:ssl_fields().
internal_pool_size(type) -> integer();
internal_pool_size(desc) -> "Pool size on start.";
internal_pool_size(default) -> 1;
internal_pool_size(validator) -> [?MIN(1)];
internal_pool_size(_) -> undefined.
%% =================================================================== %% ===================================================================
on_start(InstId, Config = #{mongo_type := Type, on_start(InstId, Config = #{mongo_type := Type,
@ -334,6 +328,11 @@ duration(desc) -> "Time interval, such as timeout or TTL.";
duration(required) -> false; duration(required) -> false;
duration(_) -> undefined. duration(_) -> undefined.
max_overflow(type) -> non_neg_integer();
max_overflow(desc) -> "Max Overflow.";
max_overflow(default) -> 0;
max_overflow(_) -> undefined.
replica_set_name(type) -> binary(); replica_set_name(type) -> binary();
replica_set_name(desc) -> "Name of the replica set."; replica_set_name(desc) -> "Name of the replica set.";
replica_set_name(required) -> false; replica_set_name(required) -> false;

View File

@ -34,7 +34,7 @@
]). ]).
-type database() :: binary(). -type database() :: binary().
-type pool_size() :: integer(). -type pool_size() :: pos_integer().
-type username() :: binary(). -type username() :: binary().
-type password() :: binary(). -type password() :: binary().
@ -72,7 +72,7 @@ database(required) -> true;
database(validator) -> [?NOT_EMPTY("the value of the field 'database' cannot be empty")]; database(validator) -> [?NOT_EMPTY("the value of the field 'database' cannot be empty")];
database(_) -> undefined. database(_) -> undefined.
pool_size(type) -> integer(); pool_size(type) -> pos_integer();
pool_size(desc) -> "Size of the connection pool."; pool_size(desc) -> "Size of the connection pool.";
pool_size(default) -> 8; pool_size(default) -> 8;
pool_size(validator) -> [?MIN(1)]; pool_size(validator) -> [?MIN(1)];

View File

@ -93,7 +93,7 @@ topic filters for 'remote_topic' of ingress connections.
"messages in case of ACK not received.", "messages in case of ACK not received.",
#{default => "15s"})} #{default => "15s"})}
, {max_inflight, , {max_inflight,
sc(integer(), sc(non_neg_integer(),
#{ default => 32 #{ default => 32
, desc => "Max inflight (sent, but un-acked) messages of the MQTT protocol" , desc => "Max inflight (sent, but un-acked) messages of the MQTT protocol"
})} })}

View File

@ -1,9 +1,6 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{deps, {deps, [{emqx, {path, "../emqx"}}]}.
[ {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.8.6"}}}
, {emqx, {path, "../emqx"}}
]}.
{edoc_opts, [{preprocess, true}]}. {edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars, {erl_opts, [warn_unused_vars,

View File

@ -107,7 +107,7 @@ fields(sampler) ->
Samplers = Samplers =
[{SamplerName, hoconsc:mk(integer(), #{desc => swagger_desc(SamplerName)})} [{SamplerName, hoconsc:mk(integer(), #{desc => swagger_desc(SamplerName)})}
|| SamplerName <- ?SAMPLER_LIST], || SamplerName <- ?SAMPLER_LIST],
[{time_stamp, hoconsc:mk(integer(), #{desc => <<"Timestamp">>})} | Samplers]; [{time_stamp, hoconsc:mk(non_neg_integer(), #{desc => <<"Timestamp">>})} | Samplers];
fields(sampler_current) -> fields(sampler_current) ->
[{SamplerName, hoconsc:mk(integer(), #{desc => swagger_desc(SamplerName)})} [{SamplerName, hoconsc:mk(integer(), #{desc => swagger_desc(SamplerName)})}

View File

@ -127,7 +127,7 @@ namespace() -> "public".
fields(page) -> fields(page) ->
Desc = <<"Page number of the results to fetch.">>, Desc = <<"Page number of the results to fetch.">>,
Meta = #{in => query, desc => Desc, default => 1, example => 1}, Meta = #{in => query, desc => Desc, default => 1, example => 1},
[{page, hoconsc:mk(integer(), Meta)}]; [{page, hoconsc:mk(pos_integer(), Meta)}];
fields(limit) -> fields(limit) ->
Desc = iolist_to_binary([ Desc = iolist_to_binary([
<<"Results per page(max ">>, <<"Results per page(max ">>,
@ -582,6 +582,8 @@ typename_to_spec("float()", _Mod) ->
typename_to_spec("integer()", _Mod) -> typename_to_spec("integer()", _Mod) ->
#{type => integer, example => 100}; #{type => integer, example => 100};
typename_to_spec("non_neg_integer()", _Mod) -> typename_to_spec("non_neg_integer()", _Mod) ->
#{type => integer, minimum => 0, example => 100};
typename_to_spec("pos_integer()", _Mod) ->
#{type => integer, minimum => 1, example => 100}; #{type => integer, minimum => 1, example => 100};
typename_to_spec("number()", _Mod) -> typename_to_spec("number()", _Mod) ->
#{type => number, example => 42}; #{type => number, example => 42};

View File

@ -101,7 +101,7 @@ t_public_ref(_Config) ->
minimum => 1,type => integer}}}, minimum => 1,type => integer}}},
#{<<"public.page">> => #{description => <<"Page number of the results to fetch.">>, #{<<"public.page">> => #{description => <<"Page number of the results to fetch.">>,
example => 1,in => query,name => page, example => 1,in => query,name => page,
schema => #{default => 1,example => 100,type => integer}}}], schema => #{default => 1,example => 100,minimum => 1,type => integer}}}],
?assertEqual(ExpectRefs, emqx_dashboard_swagger:components(Refs,#{})), ?assertEqual(ExpectRefs, emqx_dashboard_swagger:components(Refs,#{})),
ok. ok.

View File

@ -185,7 +185,7 @@ t_complicated_type(_Config) ->
Object = #{<<"content">> => #{<<"application/json">> => Object = #{<<"content">> => #{<<"application/json">> =>
#{<<"schema">> => #{<<"properties">> => #{<<"schema">> => #{<<"properties">> =>
[ [
{<<"no_neg_integer">>, #{example => 100, minimum => 1, type => integer}}, {<<"no_neg_integer">>, #{example => 100, minimum => 0, type => integer}},
{<<"url">>, #{example => <<"http://127.0.0.1">>, type => string}}, {<<"url">>, #{example => <<"http://127.0.0.1">>, type => string}},
{<<"server">>, #{example => <<"127.0.0.1:80">>, type => string}}, {<<"server">>, #{example => <<"127.0.0.1:80">>, type => string}},
{<<"connect_timeout">>, #{example => infinity, <<"oneOf">> => [ {<<"connect_timeout">>, #{example => infinity, <<"oneOf">> => [

View File

@ -92,7 +92,7 @@ fields(server) ->
)}, )},
{pool_size, {pool_size,
sc( sc(
integer(), pos_integer(),
#{ #{
default => 8, default => 8,
example => 8, example => 8,

View File

@ -284,12 +284,12 @@ fields(gateway_overview) ->
)}, )},
{max_connections, {max_connections,
mk( mk(
integer(), pos_integer(),
#{desc => <<"The Gateway allowed maximum connections/clients">>} #{desc => <<"The Gateway allowed maximum connections/clients">>}
)}, )},
{current_connections, {current_connections,
mk( mk(
integer(), non_neg_integer(),
#{desc => <<"The Gateway current connected connections/clients">>} #{desc => <<"The Gateway current connected connections/clients">>}
)}, )},
{listeners, {listeners,
@ -410,11 +410,11 @@ convert_listener_struct(Schema) ->
), ),
lists:keystore(listeners, 1, Schema1, {listeners, ListenerSchema}). lists:keystore(listeners, 1, Schema1, {listeners, ListenerSchema}).
remove_listener_and_authn(Schmea) -> remove_listener_and_authn(Schema) ->
lists:keydelete( lists:keydelete(
authentication, authentication,
1, 1,
lists:keydelete(listeners, 1, Schmea) lists:keydelete(listeners, 1, Schema)
). ).
listeners_schema(?R_REF(_Mod, tcp_listeners)) -> listeners_schema(?R_REF(_Mod, tcp_listeners)) ->

View File

@ -384,7 +384,7 @@ params_paging_in_qs() ->
[ [
{page, {page,
mk( mk(
integer(), pos_integer(),
#{ #{
in => query, in => query,
required => false, required => false,
@ -394,7 +394,7 @@ params_paging_in_qs() ->
)}, )},
{limit, {limit,
mk( mk(
integer(), pos_integer(),
#{ #{
in => query, in => query,
required => false, required => false,

View File

@ -392,21 +392,21 @@ format_channel_info({_, Infos, Stats} = R) ->
{heap_size, Stats, 0}, {heap_size, Stats, 0},
{reductions, Stats, 0} {reductions, Stats, 0}
], ],
eval(FetchX ++ extra_feilds(R)). eval(FetchX ++ extra_fields(R)).
extra_feilds({_, Infos, _Stats} = R) -> extra_fields({_, Infos, _Stats} = R) ->
extra_feilds( extra_fields(
maps:get(protocol, maps:get(clientinfo, Infos)), maps:get(protocol, maps:get(clientinfo, Infos)),
R R
). ).
extra_feilds(lwm2m, {_, Infos, _Stats}) -> extra_fields(lwm2m, {_, Infos, _Stats}) ->
ClientInfo = maps:get(clientinfo, Infos, #{}), ClientInfo = maps:get(clientinfo, Infos, #{}),
[ [
{endpoint_name, ClientInfo}, {endpoint_name, ClientInfo},
{lifetime, ClientInfo} {lifetime, ClientInfo}
]; ];
extra_feilds(_, _) -> extra_fields(_, _) ->
[]. [].
eval(Ls) -> eval(Ls) ->
@ -495,7 +495,7 @@ schema("/gateway/:name/clients/:clientid/subscriptions") ->
#{ #{
200 => emqx_dashboard_swagger:schema_with_examples( 200 => emqx_dashboard_swagger:schema_with_examples(
hoconsc:array(ref(subscription)), hoconsc:array(ref(subscription)),
examples_subsctiption_list() examples_subscription_list()
) )
} }
) )
@ -506,14 +506,14 @@ schema("/gateway/:name/clients/:clientid/subscriptions") ->
parameters => params_client_insta(), parameters => params_client_insta(),
'requestBody' => emqx_dashboard_swagger:schema_with_examples( 'requestBody' => emqx_dashboard_swagger:schema_with_examples(
ref(subscription), ref(subscription),
examples_subsctiption() examples_subscription()
), ),
responses => responses =>
?STANDARD_RESP( ?STANDARD_RESP(
#{ #{
201 => emqx_dashboard_swagger:schema_with_examples( 201 => emqx_dashboard_swagger:schema_with_examples(
ref(subscription), ref(subscription),
examples_subsctiption() examples_subscription()
) )
} }
) )
@ -664,7 +664,7 @@ params_paging() ->
[ [
{page, {page,
mk( mk(
integer(), pos_integer(),
#{ #{
in => query, in => query,
required => false, required => false,
@ -674,7 +674,7 @@ params_paging() ->
)}, )},
{limit, {limit,
mk( mk(
integer(), pos_integer(),
#{ #{
in => query, in => query,
desc => <<"Page Limit">>, desc => <<"Page Limit">>,
@ -1089,7 +1089,7 @@ examples_client() ->
} }
}. }.
examples_subsctiption_list() -> examples_subscription_list() ->
#{ #{
general_subscription_list => general_subscription_list =>
#{ #{
@ -1103,7 +1103,7 @@ examples_subsctiption_list() ->
} }
}. }.
examples_subsctiption() -> examples_subscription() ->
#{ #{
general_subscription => general_subscription =>
#{ #{

View File

@ -191,7 +191,7 @@ users(get, #{bindings := #{name := Name0, id := Id}, query_string := Qs}) ->
Name0, Name0,
Id, Id,
fun(_GwName, #{id := AuthId, chain_name := ChainName}) -> fun(_GwName, #{id := AuthId, chain_name := ChainName}) ->
emqx_authn_api:list_users(ChainName, AuthId, page_pramas(Qs)) emqx_authn_api:list_users(ChainName, AuthId, page_params(Qs))
end end
); );
users(post, #{ users(post, #{
@ -261,7 +261,7 @@ import_users(post, #{
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Utils %% Utils
page_pramas(Qs) -> page_params(Qs) ->
maps:with([<<"page">>, <<"limit">>], Qs). maps:with([<<"page">>, <<"limit">>], Qs).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -555,7 +555,7 @@ params_paging_in_qs() ->
[ [
{page, {page,
mk( mk(
integer(), pos_integer(),
#{ #{
in => query, in => query,
required => false, required => false,
@ -565,7 +565,7 @@ params_paging_in_qs() ->
)}, )},
{limit, {limit,
mk( mk(
integer(), pos_integer(),
#{ #{
in => query, in => query,
required => false, required => false,
@ -627,10 +627,12 @@ fields(tcp_listener_opts) ->
{high_watermark, mk(binary(), #{})}, {high_watermark, mk(binary(), #{})},
{nodelay, mk(boolean(), #{})}, {nodelay, mk(boolean(), #{})},
{reuseaddr, boolean()}, {reuseaddr, boolean()},
%% TODO: duri
{send_timeout, binary()}, {send_timeout, binary()},
{send_timeout_close, boolean()} {send_timeout_close, boolean()}
]; ];
fields(ssl_listener_opts) -> fields(ssl_listener_opts) ->
%% TODO: maybe use better ssl options schema from emqx_ssl_lib or somewhere
[ [
{cacertfile, binary()}, {cacertfile, binary()},
{certfile, binary()}, {certfile, binary()},
@ -762,7 +764,7 @@ common_listener_opts() ->
required => false, required => false,
desc => desc =>
<< <<
"The Mounpoint for clients of the listener. " "The Mountpoint for clients of the listener. "
"The gateway-level mountpoint configuration can be overloaded " "The gateway-level mountpoint configuration can be overloaded "
"when it is not null or empty string" "when it is not null or empty string"
>> >>
@ -774,7 +776,7 @@ common_listener_opts() ->
emqx_authn_schema:authenticator_type(), emqx_authn_schema:authenticator_type(),
#{ #{
required => {false, recursively}, required => {false, recursively},
desc => <<"The authenticatior for this listener">> desc => <<"The authenticator for this listener">>
} }
)} )}
] ++ emqx_gateway_schema:proxy_protocol_opts(). ] ++ emqx_gateway_schema:proxy_protocol_opts().

View File

@ -28,9 +28,9 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-type ip_port() :: tuple(). -type ip_port() :: tuple().
-type duration() :: integer(). -type duration() :: non_neg_integer().
-type duration_s() :: integer(). -type duration_s() :: non_neg_integer().
-type bytesize() :: integer(). -type bytesize() :: pos_integer().
-type comma_separated_list() :: list(). -type comma_separated_list() :: list().
-typerefl_from_string({ip_port/0, emqx_schema, to_ip_port}). -typerefl_from_string({ip_port/0, emqx_schema, to_ip_port}).
@ -117,7 +117,7 @@ fields(stomp_frame) ->
[ [
{max_headers, {max_headers,
sc( sc(
integer(), non_neg_integer(),
#{ #{
default => 10, default => 10,
desc => "The maximum number of Header" desc => "The maximum number of Header"
@ -125,7 +125,7 @@ fields(stomp_frame) ->
)}, )},
{max_headers_length, {max_headers_length,
sc( sc(
integer(), non_neg_integer(),
#{ #{
default => 1024, default => 1024,
desc => "The maximum string length of the Header Value" desc => "The maximum string length of the Header Value"

View File

@ -264,4 +264,4 @@ properties() ->
]. ].
m(K, Desc) -> m(K, Desc) ->
{K, mk(integer(), #{desc => Desc})}. {K, mk(non_neg_integer(), #{desc => Desc})}.

View File

@ -63,72 +63,29 @@ fields(aggregate) ->
, example => false})} , example => false})}
]; ];
fields(node_stats_data) -> fields(node_stats_data) ->
[ { 'channels.count' [
, mk( integer(), #{ desc => <<"sessions.count">> stats_schema('channels.count', <<"sessions.count">>),
, example => 0})} stats_schema('channels.max', <<"session.max">>),
, { 'channels.max' stats_schema('connections.count', <<"Number of current connections">>),
, mk( integer(), #{ desc => <<"session.max">> stats_schema('connections.max', <<"Historical maximum number of connections">>),
, example => 0})} stats_schema('delayed.count', <<"Number of delayed messages">>),
, { 'connections.count' stats_schema('delayed.max', <<"Historical maximum number of delayed messages">>),
, mk( integer(), #{ desc => <<"Number of current connections">> stats_schema('live_connections.count', <<"Number of current live connections">>),
, example => 0})} stats_schema('live_connections.max', <<"Historical maximum number of live connections">>),
, { 'connections.max' stats_schema('retained.count', <<"Number of currently retained messages">>),
, mk( integer(), #{ desc => <<"Historical maximum number of connections">> stats_schema('retained.max', <<"Historical maximum number of retained messages">>),
, example => 0})} stats_schema('sessions.count', <<"Number of current sessions">>),
, { 'delayed.count' stats_schema('sessions.max', <<"Historical maximum number of sessions">>),
, mk( integer(), #{ desc => <<"Number of delayed messages">> stats_schema('suboptions.count', <<"subscriptions.count">>),
, example => 0})} stats_schema('suboptions.max', <<"subscriptions.max">>),
, { 'delayed.max' stats_schema('subscribers.count', <<"Number of current subscribers">>),
, mk( integer(), #{ desc => <<"Historical maximum number of delayed messages">> stats_schema('subscribers.max', <<"Historical maximum number of subscribers">>),
, example => 0})} stats_schema('subscriptions.count', <<"Number of current subscriptions, including shared subscriptions">>),
, { 'live_connections.count' stats_schema('subscriptions.max', <<"Historical maximum number of subscriptions">>),
, mk( integer(), #{ desc => <<"Number of current live connections">> stats_schema('subscriptions.shared.count', <<"Number of current shared subscriptions">>),
, example => 0})} stats_schema('subscriptions.shared.max', <<"Historical maximum number of shared subscriptions">>),
, { 'live_connections.max' stats_schema('topics.count', <<"Number of current topics">>),
, mk( integer(), #{ desc => <<"Historical maximum number of live connections">> stats_schema('topics.max', <<"Historical maximum number of topics">>)
, example => 0})}
, { 'retained.count'
, mk( integer(), #{ desc => <<"Number of currently retained messages">>
, example => 0})}
, { 'retained.max'
, mk( integer(), #{ desc => <<"Historical maximum number of retained messages">>
, example => 0})}
, { 'sessions.count'
, mk( integer(), #{ desc => <<"Number of current sessions">>
, example => 0})}
, { 'sessions.max'
, mk( integer(), #{ desc => <<"Historical maximum number of sessions">>
, example => 0})}
, { 'suboptions.count'
, mk( integer(), #{ desc => <<"subscriptions.count">>
, example => 0})}
, { 'suboptions.max'
, mk( integer(), #{ desc => <<"subscriptions.max">>
, example => 0})}
, { 'subscribers.count'
, mk( integer(), #{ desc => <<"Number of current subscribers">>
, example => 0})}
, { 'subscribers.max'
, mk( integer(), #{ desc => <<"Historical maximum number of subscribers">>
, example => 0})}
, { 'subscriptions.count'
, mk( integer(), #{ desc => <<"Number of current subscriptions, including shared subscriptions">>
, example => 0})}
, { 'subscriptions.max'
, mk( integer(), #{ desc => <<"Historical maximum number of subscriptions">>
, example => 0})}
, { 'subscriptions.shared.count'
, mk( integer(), #{ desc => <<"Number of current shared subscriptions">>
, example => 0})}
, { 'subscriptions.shared.max'
, mk( integer(), #{ desc => <<"Historical maximum number of shared subscriptions">>
, example => 0})}
, { 'topics.count'
, mk( integer(), #{ desc => <<"Number of current topics">>
, example => 0})}
, { 'topics.max'
, mk( integer(), #{ desc => <<"Historical maximum number of topics">>
, example => 0})}
]; ];
fields(aggergate_data) -> fields(aggergate_data) ->
[ { node [ { node
@ -136,6 +93,8 @@ fields(aggergate_data) ->
, example => <<"emqx@127.0.0.1">>})} , example => <<"emqx@127.0.0.1">>})}
] ++ fields(node_stats_data). ] ++ fields(node_stats_data).
stats_schema(Name, Desc) ->
{Name, mk(non_neg_integer(), #{desc => Desc, example => 0})}.
%%%============================================================================================== %%%==============================================================================================
%% api apply %% api apply

View File

@ -149,9 +149,9 @@ schema("/mqtt/delayed/messages") ->
[ [
{data, mk(hoconsc:array(ref("message")), #{})}, {data, mk(hoconsc:array(ref("message")), #{})},
{meta, [ {meta, [
{page, mk(integer(), #{})}, {page, mk(pos_integer(), #{})},
{limit, mk(integer(), #{})}, {limit, mk(pos_integer(), #{})},
{count, mk(integer(), #{})} {count, mk(non_neg_integer(), #{})}
]} ]}
] ]
} }
@ -163,11 +163,11 @@ fields("message_without_payload") ->
{msgid, mk(integer(), #{desc => <<"Message Id (MQTT message id hash)">>})}, {msgid, mk(integer(), #{desc => <<"Message Id (MQTT message id hash)">>})},
{node, mk(binary(), #{desc => <<"The node where message from">>})}, {node, mk(binary(), #{desc => <<"The node where message from">>})},
{publish_at, mk(binary(), #{desc => <<"Client publish message time, rfc 3339">>})}, {publish_at, mk(binary(), #{desc => <<"Client publish message time, rfc 3339">>})},
{delayed_interval, mk(integer(), #{desc => <<"Delayed interval, second">>})}, {delayed_interval, mk(pos_integer(), #{desc => <<"Delayed interval, second">>})},
{delayed_remaining, mk(integer(), #{desc => <<"Delayed remaining, second">>})}, {delayed_remaining, mk(non_neg_integer(), #{desc => <<"Delayed remaining, second">>})},
{expected_at, mk(binary(), #{desc => <<"Expect publish time, rfc 3339">>})}, {expected_at, mk(binary(), #{desc => <<"Expect publish time, rfc 3339">>})},
{topic, mk(binary(), #{desc => <<"Topic">>, example => <<"/sys/#">>})}, {topic, mk(binary(), #{desc => <<"Topic">>, example => <<"/sys/#">>})},
{qos, mk(binary(), #{desc => <<"QoS">>})}, {qos, mk(emqx_schema:qos(), #{desc => <<"QoS">>})},
{from_clientid, mk(binary(), #{desc => <<"From ClientId">>})}, {from_clientid, mk(binary(), #{desc => <<"From ClientId">>})},
{from_username, mk(binary(), #{desc => <<"From Username">>})} {from_username, mk(binary(), #{desc => <<"From Username">>})}
]; ];

View File

@ -307,17 +307,19 @@ calculate_rate(CurrVal, #rate{max = MaxRate0, last_v = LastVal,
%% calculate the max rate since the emqx startup %% calculate the max rate since the emqx startup
MaxRate = MaxRate =
if MaxRate0 >= CurrRate -> MaxRate0; case MaxRate0 >= CurrRate of
true -> CurrRate true -> MaxRate0;
false -> CurrRate
end, end,
%% calculate the average rate in last 5 mins %% calculate the average rate in last 5 mins
{Last5MinSamples, Acc5Min, Last5Min} = {Last5MinSamples, Acc5Min, Last5Min} =
if Tick =< ?SAMPCOUNT_5M -> case Tick =< ?SAMPCOUNT_5M of
true ->
Acc = AccRate5Min0 + CurrRate, Acc = AccRate5Min0 + CurrRate,
{lists:reverse([CurrRate | lists:reverse(Last5MinSamples0)]), {lists:reverse([CurrRate | lists:reverse(Last5MinSamples0)]),
Acc, Acc / Tick}; Acc, Acc / Tick};
true -> false ->
[FirstRate | Rates] = Last5MinSamples0, [FirstRate | Rates] = Last5MinSamples0,
Acc = AccRate5Min0 + CurrRate - FirstRate, Acc = AccRate5Min0 + CurrRate - FirstRate,
{lists:reverse([CurrRate | lists:reverse(Rates)]), {lists:reverse([CurrRate | lists:reverse(Rates)]),

View File

@ -53,7 +53,7 @@ fields("rule_info") ->
})} })}
] ++ fields("rule_creation"); ] ++ fields("rule_creation");
%% TODO: we can delete this API if the Dashboard not denpends on it %% TODO: we can delete this API if the Dashboard not depends on it
fields("rule_events") -> fields("rule_events") ->
ETopics = [binary_to_atom(emqx_rule_events:event_topic(E)) || E <- emqx_rule_events:event_names()], ETopics = [binary_to_atom(emqx_rule_events:event_topic(E)) || E <- emqx_rule_events:event_names()],
[ {"event", sc(hoconsc:enum(ETopics), #{desc => "The event topics", required => true})} [ {"event", sc(hoconsc:enum(ETopics), #{desc => "The event topics", required => true})}
@ -83,39 +83,39 @@ fields("rule_test") ->
]; ];
fields("metrics") -> fields("metrics") ->
[ {"sql.matched", sc(integer(), #{ [ {"sql.matched", sc(non_neg_integer(), #{
desc => "How much times the FROM clause of the SQL is matched." desc => "How much times the FROM clause of the SQL is matched."
})} })}
, {"sql.matched.rate", sc(float(), #{desc => "The rate of matched, times/second"})} , {"sql.matched.rate", sc(float(), #{desc => "The rate of matched, times/second"})}
, {"sql.matched.rate.max", sc(float(), #{desc => "The max rate of matched, times/second"})} , {"sql.matched.rate.max", sc(float(), #{desc => "The max rate of matched, times/second"})}
, {"sql.matched.rate.last5m", sc(float(), , {"sql.matched.rate.last5m", sc(float(),
#{desc => "The average rate of matched in last 5 minutes, times/second"})} #{desc => "The average rate of matched in last 5 minutes, times/second"})}
, {"sql.passed", sc(integer(), #{desc => "How much times the SQL is passed"})} , {"sql.passed", sc(non_neg_integer(), #{desc => "How much times the SQL is passed"})}
, {"sql.failed", sc(integer(), #{desc => "How much times the SQL is failed"})} , {"sql.failed", sc(non_neg_integer(), #{desc => "How much times the SQL is failed"})}
, {"sql.failed.exception", sc(integer(), #{ , {"sql.failed.exception", sc(non_neg_integer(), #{
desc => "How much times the SQL is failed due to exceptions. " desc => "How much times the SQL is failed due to exceptions. "
"This may because of a crash when calling a SQL function, or " "This may because of a crash when calling a SQL function, or "
"trying to do arithmetic operation on undefined variables" "trying to do arithmetic operation on undefined variables"
})} })}
, {"sql.failed.unknown", sc(integer(), #{ , {"sql.failed.unknown", sc(non_neg_integer(), #{
desc => "How much times the SQL is failed due to an unknown error." desc => "How much times the SQL is failed due to an unknown error."
})} })}
, {"outputs.total", sc(integer(), #{ , {"outputs.total", sc(non_neg_integer(), #{
desc => "How much times the outputs are called by the rule. " desc => "How much times the outputs are called by the rule. "
"This value may several times of 'sql.matched', depending on the " "This value may several times of 'sql.matched', depending on the "
"number of the outputs of the rule." "number of the outputs of the rule."
})} })}
, {"outputs.success", sc(integer(), #{ , {"outputs.success", sc(non_neg_integer(), #{
desc => "How much times the rule success to call the outputs." desc => "How much times the rule success to call the outputs."
})} })}
, {"outputs.failed", sc(integer(), #{ , {"outputs.failed", sc(non_neg_integer(), #{
desc => "How much times the rule failed to call the outputs." desc => "How much times the rule failed to call the outputs."
})} })}
, {"outputs.failed.out_of_service", sc(integer(), #{ , {"outputs.failed.out_of_service", sc(non_neg_integer(), #{
desc => "How much times the rule failed to call outputs due to the output is " desc => "How much times the rule failed to call outputs due to the output is "
"out of service. For example, a bridge is disabled or stopped." "out of service. For example, a bridge is disabled or stopped."
})} })}
, {"outputs.failed.unknown", sc(integer(), #{ , {"outputs.failed.unknown", sc(non_neg_integer(), #{
desc => "How much times the rule failed to call outputs due to to an unknown error." desc => "How much times the rule failed to call outputs due to to an unknown error."
})} })}
]; ];

View File

@ -99,7 +99,7 @@ rule_info_schema() ->
schema("/rules") -> schema("/rules") ->
#{ #{
operationId => '/rules', 'operationId' => '/rules',
get => #{ get => #{
tags => [<<"rules">>], tags => [<<"rules">>],
description => <<"List all rules">>, description => <<"List all rules">>,
@ -111,7 +111,7 @@ schema("/rules") ->
tags => [<<"rules">>], tags => [<<"rules">>],
description => <<"Create a new rule using given Id">>, description => <<"Create a new rule using given Id">>,
summary => <<"Create a Rule">>, summary => <<"Create a Rule">>,
requestBody => rule_creation_schema(), 'requestBody' => rule_creation_schema(),
responses => #{ responses => #{
400 => error_schema('BAD_REQUEST', "Invalid Parameters"), 400 => error_schema('BAD_REQUEST', "Invalid Parameters"),
201 => rule_info_schema() 201 => rule_info_schema()
@ -120,7 +120,7 @@ schema("/rules") ->
schema("/rule_events") -> schema("/rule_events") ->
#{ #{
operationId => '/rule_events', 'operationId' => '/rule_events',
get => #{ get => #{
tags => [<<"rules">>], tags => [<<"rules">>],
description => <<"List all events can be used in rules">>, description => <<"List all events can be used in rules">>,
@ -133,7 +133,7 @@ schema("/rule_events") ->
schema("/rules/:id") -> schema("/rules/:id") ->
#{ #{
operationId => '/rules/:id', 'operationId' => '/rules/:id',
get => #{ get => #{
tags => [<<"rules">>], tags => [<<"rules">>],
description => <<"Get a rule by given Id">>, description => <<"Get a rule by given Id">>,
@ -149,7 +149,7 @@ schema("/rules/:id") ->
description => <<"Update a rule by given Id to all nodes in the cluster">>, description => <<"Update a rule by given Id to all nodes in the cluster">>,
summary => <<"Update a Rule">>, summary => <<"Update a Rule">>,
parameters => param_path_id(), parameters => param_path_id(),
requestBody => rule_creation_schema(), 'requestBody' => rule_creation_schema(),
responses => #{ responses => #{
400 => error_schema('BAD_REQUEST', "Invalid Parameters"), 400 => error_schema('BAD_REQUEST', "Invalid Parameters"),
200 => rule_info_schema() 200 => rule_info_schema()
@ -168,7 +168,7 @@ schema("/rules/:id") ->
schema("/rules/:id/reset_metrics") -> schema("/rules/:id/reset_metrics") ->
#{ #{
operationId => '/rules/:id/reset_metrics', 'operationId' => '/rules/:id/reset_metrics',
put => #{ put => #{
tags => [<<"rules">>], tags => [<<"rules">>],
description => <<"Reset a rule metrics">>, description => <<"Reset a rule metrics">>,
@ -183,12 +183,12 @@ schema("/rules/:id/reset_metrics") ->
schema("/rule_test") -> schema("/rule_test") ->
#{ #{
operationId => '/rule_test', 'operationId' => '/rule_test',
post => #{ post => #{
tags => [<<"rules">>], tags => [<<"rules">>],
description => <<"Test a rule">>, description => <<"Test a rule">>,
summary => <<"Test a Rule">>, summary => <<"Test a Rule">>,
requestBody => rule_test_schema(), 'requestBody' => rule_test_schema(),
responses => #{ responses => #{
400 => error_schema('BAD_REQUEST', "Invalid Parameters"), 400 => error_schema('BAD_REQUEST', "Invalid Parameters"),
412 => error_schema('NOT_MATCH', "SQL Not Match"), 412 => error_schema('NOT_MATCH', "SQL Not Match"),

View File

@ -48,9 +48,9 @@
-spec(apply_rules(list(rule()), input()) -> ok). -spec(apply_rules(list(rule()), input()) -> ok).
apply_rules([], _Input) -> apply_rules([], _Input) ->
ok; ok;
apply_rules([#{enable := false}|More], Input) -> apply_rules([#{enable := false} | More], Input) ->
apply_rules(More, Input); apply_rules(More, Input);
apply_rules([Rule|More], Input) -> apply_rules([Rule | More], Input) ->
apply_rule_discard_result(Rule, Input), apply_rule_discard_result(Rule, Input),
apply_rules(More, Input). apply_rules(More, Input).
@ -150,14 +150,14 @@ select_and_transform(Fields, Input) ->
select_and_transform([], _Input, Output) -> select_and_transform([], _Input, Output) ->
Output; Output;
select_and_transform(['*'|More], Input, Output) -> select_and_transform(['*' | More], Input, Output) ->
select_and_transform(More, Input, maps:merge(Output, Input)); select_and_transform(More, Input, maps:merge(Output, Input));
select_and_transform([{as, Field, Alias}|More], Input, Output) -> select_and_transform([{as, Field, Alias} | More], Input, Output) ->
Val = eval(Field, Input), Val = eval(Field, Input),
select_and_transform(More, select_and_transform(More,
nested_put(Alias, Val, Input), nested_put(Alias, Val, Input),
nested_put(Alias, Val, Output)); nested_put(Alias, Val, Output));
select_and_transform([Field|More], Input, Output) -> select_and_transform([Field | More], Input, Output) ->
Val = eval(Field, Input), Val = eval(Field, Input),
Key = alias(Field), Key = alias(Field),
select_and_transform(More, select_and_transform(More,
@ -172,7 +172,7 @@ select_and_collect(Fields, Input) ->
select_and_collect([{as, Field, {_, A} = Alias}], Input, {Output, _}) -> select_and_collect([{as, Field, {_, A} = Alias}], Input, {Output, _}) ->
Val = eval(Field, Input), Val = eval(Field, Input),
{nested_put(Alias, Val, Output), {A, ensure_list(Val)}}; {nested_put(Alias, Val, Output), {A, ensure_list(Val)}};
select_and_collect([{as, Field, Alias}|More], Input, {Output, LastKV}) -> select_and_collect([{as, Field, Alias} | More], Input, {Output, LastKV}) ->
Val = eval(Field, Input), Val = eval(Field, Input),
select_and_collect(More, select_and_collect(More,
nested_put(Alias, Val, Input), nested_put(Alias, Val, Input),
@ -181,7 +181,7 @@ select_and_collect([Field], Input, {Output, _}) ->
Val = eval(Field, Input), Val = eval(Field, Input),
Key = alias(Field), Key = alias(Field),
{nested_put(Key, Val, Output), {'item', ensure_list(Val)}}; {nested_put(Key, Val, Output), {'item', ensure_list(Val)}};
select_and_collect([Field|More], Input, {Output, LastKV}) -> select_and_collect([Field | More], Input, {Output, LastKV}) ->
Val = eval(Field, Input), Val = eval(Field, Input),
Key = alias(Field), Key = alias(Field),
select_and_collect(More, select_and_collect(More,

View File

@ -50,8 +50,8 @@ schema(("/slow_subscriptions")) ->
}, },
get => #{tags => [<<"slow subs">>], get => #{tags => [<<"slow subs">>],
description => <<"Get slow topics statistics record data">>, description => <<"Get slow topics statistics record data">>,
parameters => [ {page, mk(integer(), #{in => query})} parameters => [ {page, mk(pos_integer(), #{in => query})}
, {limit, mk(integer(), #{in => query})} , {limit, mk(pos_integer(), #{in => query})}
], ],
'requestBody' => [], 'requestBody' => [],
responses => #{200 => [{data, mk(hoconsc:array(ref(record)), #{})}]} responses => #{200 => [{data, mk(hoconsc:array(ref(record)), #{})}]}

View File

@ -19,7 +19,7 @@ fields("slow_subs") ->
"300s", "300s",
"The eviction time of the record, which in the statistics record table.")} "The eviction time of the record, which in the statistics record table.")}
, {top_k_num, , {top_k_num,
sc(integer(), sc(pos_integer(),
10, 10,
"The maximum number of records in the slow subscription statistics record table.")} "The maximum number of records in the slow subscription statistics record table.")}
, {stats_type, , {stats_type,

View File

@ -96,7 +96,7 @@ t_get_history(_) ->
lists:foreach(Each, lists:seq(1, 5)), lists:foreach(Each, lists:seq(1, 5)),
{ok, Data} = request_api(get, api_path(["slow_subscriptions"]), "_page=1&_limit=10", {ok, Data} = request_api(get, api_path(["slow_subscriptions"]), "page=1&limit=10",
auth_header_()), auth_header_()),
#{<<"data">> := [First | _]} = emqx_json:decode(Data, [return_maps]), #{<<"data">> := [First | _]} = emqx_json:decode(Data, [return_maps]),

View File

@ -48,7 +48,7 @@ defmodule EMQXUmbrella.MixProject do
[ [
{:lc, github: "emqx/lc", tag: "0.2.1"}, {:lc, github: "emqx/lc", tag: "0.2.1"},
{:redbug, "2.0.7"}, {:redbug, "2.0.7"},
{:typerefl, github: "ieQu1/typerefl", tag: "0.8.6", override: true}, {:typerefl, github: "ieQu1/typerefl", tag: "0.9.0", override: true},
{:ehttpc, github: "emqx/ehttpc", tag: "0.1.12"}, {:ehttpc, github: "emqx/ehttpc", tag: "0.1.12"},
{:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true}, {:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true},
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},

View File

@ -47,7 +47,7 @@
[ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.2.1"}}} [ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.2.1"}}}
, {redbug, "2.0.7"} , {redbug, "2.0.7"}
, {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.8.6"}}} , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.0"}}}
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.12"}}} , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.12"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}