Merge pull request #5544 from turtleDeng/refactor-mgmt-schema-util

refactor(schema-utils): refactor mgmt swagger schema utils
This commit is contained in:
turtleDeng 2021-08-24 11:46:48 +08:00 committed by GitHub
commit 335b8fec63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 806 additions and 1195 deletions

View File

@ -30,10 +30,12 @@
-include("emqx_dashboard.hrl").
-import(emqx_mgmt_util, [ response_schema/1
, response_schema/2
, request_body_schema/1
, response_array_schema/2
-import(emqx_mgmt_util, [ schema/1
, object_schema/1
, object_schema/2
, object_array_schema/1
, bad_request/0
, properties/1
]).
-export([api_spec/0]).
@ -58,95 +60,59 @@ api_spec() ->
[]}.
login_api() ->
AuthSchema = #{
type => object,
properties => #{
username => #{
type => string,
description => <<"Username">>},
password => #{
type => string,
description => <<"Password">>}}},
TokenSchema = #{
type => object,
properties => #{
token => #{
type => string,
description => <<"JWT Token">>},
license => #{
type => object,
properties => #{
edition => #{
type => string,
enum => [community, enterprise]}}},
version => #{
type => string}}},
AuthProps = properties([{username, string, <<"Username">>},
{password, string, <<"Password">>}]),
TokenProps = properties([{token, string, <<"JWT Token">>},
{license, object, [{edition, string, <<"License">>, [community, enterprise]}]},
{version, string}]),
Metadata = #{
post => #{
tags => [dashboard],
description => <<"Dashboard Auth">>,
'requestBody' => request_body_schema(AuthSchema),
'requestBody' => object_schema(AuthProps),
responses => #{
<<"200">> =>
response_schema(<<"Dashboard Auth successfully">>, TokenSchema),
object_schema(TokenProps, <<"Dashboard Auth successfully">>),
<<"401">> => unauthorized_request()
},
security => []
}
},
{"/login", Metadata, login}.
logout_api() ->
AuthSchema = #{
type => object,
properties => #{
username => #{
type => string,
description => <<"Username">>}}},
LogoutProps = properties([{username, string, <<"Username">>}]),
Metadata = #{
post => #{
tags => [dashboard],
description => <<"Dashboard Auth">>,
'requestBody' => request_body_schema(AuthSchema),
'requestBody' => object_schema(LogoutProps),
responses => #{
<<"200">> =>
response_schema(<<"Dashboard Auth successfully">>)}
<<"200">> => schema(<<"Dashboard Auth successfully">>)
}
}
},
{"/logout", Metadata, logout}.
users_api() ->
ShowSchema = #{
type => object,
properties => #{
username => #{
type => string,
description => <<"Username">>},
tag => #{
type => string,
description => <<"Tag">>}}},
CreateSchema = #{
type => object,
properties => #{
username => #{
type => string,
description => <<"Username">>},
password => #{
type => string,
description => <<"Password">>},
tag => #{
type => string,
description => <<"Tag">>}}},
BaseProps = properties([{username, string, <<"Username">>},
{password, string, <<"Password">>},
{tag, string, <<"Tag">>}]),
Metadata = #{
get => #{
tags => [dashboard],
description => <<"Get dashboard users">>,
responses => #{
<<"200">> => response_array_schema(<<"">>, ShowSchema)
<<"200">> => object_array_schema(maps:without([password], BaseProps))
}
},
post => #{
tags => [dashboard],
description => <<"Create dashboard users">>,
'requestBody' => request_body_schema(CreateSchema),
'requestBody' => object_schema(BaseProps),
responses => #{
<<"200">> => response_schema(<<"Create Users successfully">>),
<<"200">> => schema(<<"Create Users successfully">>),
<<"400">> => bad_request()
}
}
@ -156,26 +122,21 @@ users_api() ->
user_api() ->
Metadata = #{
delete => #{
tags => [dashboard],
description => <<"Delete dashboard users">>,
parameters => [path_param_username()],
parameters => parameters(),
responses => #{
<<"200">> => response_schema(<<"Delete User successfully">>),
<<"200">> => schema(<<"Delete User successfully">>),
<<"400">> => bad_request()
}
},
put => #{
tags => [dashboard],
description => <<"Update dashboard users">>,
parameters => [path_param_username()],
'requestBody' => request_body_schema(#{
type => object,
properties => #{
tag => #{
type => string
}
}
}),
parameters => parameters(),
'requestBody' => object_schema(properties([{tag, string, <<"Tag">>}])),
responses => #{
<<"200">> => response_schema(<<"Update Users successfully">>),
<<"200">> => schema(<<"Update Users successfully">>),
<<"400">> => bad_request()
}
}
@ -185,36 +146,18 @@ user_api() ->
change_pwd_api() ->
Metadata = #{
put => #{
tags => [dashboard],
description => <<"Update dashboard users password">>,
parameters => [path_param_username()],
'requestBody' => request_body_schema(#{
type => object,
properties => #{
old_pwd => #{
type => string
},
new_pwd => #{
type => string
}
}
}),
parameters => parameters(),
'requestBody' => object_schema(properties([old_pwd, new_pwd])),
responses => #{
<<"200">> => response_schema(<<"Update Users password successfully">>),
<<"200">> => schema(<<"Update Users password successfully">>),
<<"400">> => bad_request()
}
}
},
{"/users/:username/change_pwd", Metadata, change_pwd}.
path_param_username() ->
#{
name => username,
in => path,
required => true,
schema => #{type => string},
example => <<"admin">>
}.
login(post, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Params = emqx_json:decode(Body, [return_maps]),
@ -292,21 +235,19 @@ change_pwd(put, Request) ->
row(#mqtt_admin{username = Username, tags = Tag}) ->
#{username => Username, tag => Tag}.
bad_request() ->
response_schema(<<"Bad Request">>,
#{
type => object,
properties => #{
message => #{type => string},
code => #{type => string}
}
}).
parameters() ->
[#{
name => username,
in => path,
required => true,
schema => #{type => string},
example => <<"admin">>
}].
unauthorized_request() ->
response_schema(<<"Unauthorized">>,
#{
type => object,
properties => #{
message => #{type => string},
code => #{type => string, enum => ['PASSWORD_ERROR', 'USERNAME_ERROR']}
}
}).
object_schema(
properties([{message, string},
{code, string, <<"Resp Code">>, ['PASSWORD_ERROR','USERNAME_ERROR']}
]),
<<"Unauthorized">>
).

View File

@ -8,6 +8,7 @@
-behaviour(minirest_api).
-import(emqx_mgmt_util, [schema/2]).
-export([api_spec/0]).
-export([ monitor/2
@ -47,7 +48,7 @@ monitor_api() ->
}
],
responses => #{
<<"200">> => emqx_mgmt_util:response_schema(<<"Monitor count data">>, counters_schema())}}},
<<"200">> => schema(counters_schema(), <<"Monitor count data">>)}}},
{"/monitor", Metadata, monitor}.
monitor_nodes_api() ->
@ -56,7 +57,7 @@ monitor_nodes_api() ->
description => <<"List monitor data">>,
parameters => [path_param_node()],
responses => #{
<<"200">> => emqx_mgmt_util:response_schema(<<"Monitor count data in node">>, counters_schema())}}},
<<"200">> => schema(counters_schema(), <<"Monitor count data in node">>)}}},
{"/monitor/nodes/:node", Metadata, monitor_nodes}.
monitor_nodes_counters_api() ->
@ -68,7 +69,7 @@ monitor_nodes_counters_api() ->
path_param_counter()
],
responses => #{
<<"200">> => emqx_mgmt_util:response_schema(<<"Monitor single count data in node">>, counter_schema())}}},
<<"200">> => schema(counter_schema(), <<"Monitor single count data in node">>)}}},
{"/monitor/nodes/:node/counters/:counter", Metadata, monitor_nodes_counters}.
monitor_counters_api() ->
@ -80,15 +81,14 @@ monitor_counters_api() ->
],
responses => #{
<<"200">> =>
emqx_mgmt_util:response_schema(<<"Monitor single count data">>, counter_schema())}}},
schema(counter_schema(), <<"Monitor single count data">>)}}},
{"/monitor/counters/:counter", Metadata, counters}.
monitor_current_api() ->
Metadata = #{
get => #{
description => <<"Current monitor data">>,
responses => #{
<<"200">> => emqx_mgmt_util:response_schema(<<"Current monitor data">>,
current_counters_schema())}}},
<<"200">> => schema(current_counters_schema(), <<"Current monitor data">>)}}},
{"/monitor/current", Metadata, current_counters}.
path_param_node() ->

View File

@ -28,38 +28,28 @@
-define(ACTIVATED_ALARM, emqx_activated_alarm).
-define(DEACTIVATED_ALARM, emqx_deactivated_alarm).
api_spec() ->
{[alarms_api()], [alarm_schema()]}.
-import(emqx_mgmt_util, [ object_array_schema/2
, schema/1
, properties/1
]).
alarm_schema() ->
#{
alarm => #{
type => object,
properties => #{
node => #{
type => string,
description => <<"Alarm in node">>},
name => #{
type => string,
description => <<"Alarm name">>},
message => #{
type => string,
description => <<"Alarm readable information">>},
details => #{
type => object,
description => <<"Alarm detail">>},
duration => #{
type => integer,
description => <<"Alarms duration time; UNIX time stamp">>}
}
}
}.
api_spec() ->
{[alarms_api()], []}.
properties() ->
properties([
{node, string, <<"Alarm in node">>},
{name, string, <<"Alarm name">>},
{message, string, <<"Alarm readable information">>},
{details, object},
{duration, integer, <<"Alarms duration time; UNIX time stamp">>}
]).
alarms_api() ->
Metadata = #{
get => #{
description => <<"EMQ X alarms">>,
parameters => [#{
parameters => emqx_mgmt_util:page_params() ++ [#{
name => activated,
in => query,
description => <<"All alarms, if not specified">>,
@ -68,37 +58,36 @@ alarms_api() ->
}],
responses => #{
<<"200">> =>
emqx_mgmt_util:response_array_schema(<<"List all alarms">>, alarm)}},
object_array_schema(properties(), <<"List all alarms">>)}},
delete => #{
description => <<"Remove all deactivated alarms">>,
responses => #{
<<"200">> =>
emqx_mgmt_util:response_schema(<<"Remove all deactivated alarms ok">>)}}},
schema(<<"Remove all deactivated alarms ok">>)}}},
{"/alarms", Metadata, alarms}.
%%%==============================================================================================
%% parameters trans
alarms(get, Request) ->
case proplists:get_value(<<"activated">>, cowboy_req:parse_qs(Request), undefined) of
undefined ->
list(#{activated => undefined});
<<"true">> ->
list(#{activated => true});
<<"false">> ->
list(#{activated => false})
end;
Params = cowboy_req:parse_qs(Request),
list(Params);
alarms(delete, _Request) ->
delete().
%%%==============================================================================================
%% api apply
list(#{activated := true}) ->
do_list(activated);
list(#{activated := false}) ->
do_list(deactivated);
list(#{activated := undefined}) ->
do_list(activated).
list(Params) ->
{Table, Function} =
case proplists:get_value(<<"activated">>, Params, <<"true">>) of
<<"true">> ->
{?ACTIVATED_ALARM, query_activated};
<<"false">> ->
{?DEACTIVATED_ALARM, query_deactivated}
end,
Params1 = proplists:delete(<<"activated">>, Params),
Response = emqx_mgmt_api:cluster_query(Params1, {Table, []}, {?MODULE, Function}),
{200, Response}.
delete() ->
_ = emqx_mgmt:delete_all_deactivated_alarms(),
@ -106,17 +95,6 @@ delete() ->
%%%==============================================================================================
%% internal
do_list(Type) ->
{Table, Function} =
case Type of
activated ->
{?ACTIVATED_ALARM, query_activated};
deactivated ->
{?DEACTIVATED_ALARM, query_deactivated}
end,
Response = emqx_mgmt_api:cluster_query([], {Table, []}, {?MODULE, Function}),
{200, Response}.
query_activated(_, Start, Limit) ->
query(?ACTIVATED_ALARM, Start, Limit).

View File

@ -18,8 +18,17 @@
-behaviour(minirest_api).
-export([api_spec/0]).
-import(emqx_mgmt_util, [ schema/1
, schema/2
, object_schema/1
, object_schema/2
, object_array_schema/2
, error_schema/1
, error_schema/2
, properties/1
]).
-export([api_spec/0]).
-export([ apps/2
, app/2]).
@ -30,48 +39,22 @@
api_spec() ->
{
[apps_api(), app_api()],
[app_schema(), app_secret_schema()]
[]
}.
app_schema() ->
#{app => #{
type => object,
properties => app_properties()}}.
app_properties() ->
#{
app_id => #{
type => string,
description => <<"App ID">>},
secret => #{
type => string,
description => <<"App Secret">>},
name => #{
type => string,
description => <<"Dsiplay name">>},
desc => #{
type => string,
description => <<"App description">>},
status => #{
type => boolean,
description => <<"Enable or disable">>},
expired => #{
type => integer,
description => <<"Expired time">>}
}.
app_secret_schema() ->
#{app_secret => #{
type => object,
properties => #{
secret => #{type => string}}}}.
properties() ->
properties([
{app_id, string, <<"App ID">>},
{secret, string, <<"App Secret">>},
{name, string, <<"Dsiplay name">>},
{desc, string, <<"App description">>},
{status, boolean, <<"Enable or disable">>},
{expired, integer, <<"Expired time">>}
]).
%% not export schema
app_without_secret_schema() ->
#{
type => object,
properties => maps:without([secret], app_properties())
}.
maps:without([secret], properties()).
apps_api() ->
Metadata = #{
@ -79,16 +62,20 @@ apps_api() ->
description => <<"List EMQ X apps">>,
responses => #{
<<"200">> =>
emqx_mgmt_util:response_array_schema(<<"All apps">>,
app_without_secret_schema())}},
object_array_schema(app_without_secret_schema(), <<"All apps">>)
}
},
post => #{
description => <<"EMQ X create apps">>,
'requestBody' => emqx_mgmt_util:request_body_schema(<<"app">>),
'requestBody' => schema(app),
responses => #{
<<"200">> =>
emqx_mgmt_util:response_schema(<<"Create apps">>, app_secret),
schema(app_secret, <<"Create apps">>),
<<"400">> =>
emqx_mgmt_util:response_error_schema(<<"App ID already exist">>, [?BAD_APP_ID])}}},
error_schema(<<"App ID already exist">>, [?BAD_APP_ID])
}
}
},
{"/apps", Metadata, apps}.
app_api() ->
@ -102,9 +89,9 @@ app_api() ->
schema => #{type => string}}],
responses => #{
<<"404">> =>
emqx_mgmt_util:response_error_schema(<<"App id not found">>),
error_schema(<<"App id not found">>),
<<"200">> =>
emqx_mgmt_util:response_schema(<<"Get App">>, app_without_secret_schema())}},
object_schema(app_without_secret_schema(), <<"Get App">>)}},
delete => #{
description => <<"EMQ X apps">>,
parameters => [#{
@ -114,7 +101,7 @@ app_api() ->
schema => #{type => string}
}],
responses => #{
<<"200">> => emqx_mgmt_util:response_schema(<<"Remove app ok">>)}},
<<"200">> => schema(<<"Remove app ok">>)}},
put => #{
description => <<"EMQ X update apps">>,
parameters => [#{
@ -123,12 +110,12 @@ app_api() ->
required => true,
schema => #{type => string}
}],
'requestBody' => emqx_mgmt_util:request_body_schema(app_without_secret_schema()),
'requestBody' => object_schema(app_without_secret_schema()),
responses => #{
<<"404">> =>
emqx_mgmt_util:response_error_schema(<<"App id not found">>, [?BAD_APP_ID]),
error_schema(<<"App id not found">>, [?BAD_APP_ID]),
<<"200">> =>
emqx_mgmt_util:response_schema(<<"Update ok">>, app_without_secret_schema())}}},
object_schema(app_without_secret_schema(), <<"Update ok">>)}}},
{"/apps/:app_id", Metadata, app}.
%%%==============================================================================================

View File

@ -333,7 +333,7 @@ clients_api() ->
}
],
responses => #{
<<"200">> => emqx_mgmt_util:response_array_schema(<<"List clients 200 OK">>, client)}}},
<<"200">> => emqx_mgmt_util:array_schema(client, <<"List clients 200 OK">>)}}},
{"/clients", Metadata, clients}.
client_api() ->
@ -347,8 +347,8 @@ client_api() ->
required => true
}],
responses => #{
<<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:response_schema(<<"List clients 200 OK">>, client)}},
<<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:schema(client, <<"List clients 200 OK">>)}},
delete => #{
description => <<"Kick out client by client ID">>,
parameters => [#{
@ -358,8 +358,8 @@ client_api() ->
required => true
}],
responses => #{
<<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:response_schema(<<"List clients 200 OK">>, client)}}},
<<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:schema(client, <<"List clients 200 OK">>)}}},
{"/clients/:clientid", Metadata, client}.
clients_authz_cache_api() ->
@ -373,8 +373,8 @@ clients_authz_cache_api() ->
required => true
}],
responses => #{
<<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:response_schema(<<"Get client authz cache">>, <<"authz_cache">>)}},
<<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:schema(authz_cache, <<"Get client authz cache">>)}},
delete => #{
description => <<"Clean client authz cache">>,
parameters => [#{
@ -384,8 +384,8 @@ clients_authz_cache_api() ->
required => true
}],
responses => #{
<<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:response_schema(<<"Delete clients 200 OK">>)}}},
<<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:schema(<<"Delete clients 200 OK">>)}}},
{"/clients/:clientid/authz_cache", Metadata, authz_cache}.
clients_subscriptions_api() ->
@ -400,7 +400,7 @@ clients_subscriptions_api() ->
}],
responses => #{
<<"200">> =>
emqx_mgmt_util:response_array_schema(<<"Get client subscriptions">>, subscription)}}
emqx_mgmt_util:array_schema(subscription, <<"Get client subscriptions">>)}}
},
{"/clients/:clientid/subscriptions", Metadata, subscriptions}.
@ -416,15 +416,15 @@ unsubscribe_api() ->
required => true
}
],
'requestBody' => emqx_mgmt_util:request_body_schema(#{
'requestBody' => emqx_mgmt_util:schema(#{
type => object,
properties => #{
topic => #{
type => string,
description => <<"Topic">>}}}),
responses => #{
<<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:response_schema(<<"Unsubscribe ok">>)}}},
<<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:schema(<<"Unsubscribe ok">>)}}},
{"/clients/:clientid/unsubscribe", Metadata, unsubscribe}.
subscribe_api() ->
Metadata = #{
@ -436,7 +436,7 @@ subscribe_api() ->
schema => #{type => string},
required => true
}],
'requestBody' => emqx_mgmt_util:request_body_schema(#{
'requestBody' => emqx_mgmt_util:schema(#{
type => object,
properties => #{
topic => #{
@ -448,8 +448,8 @@ subscribe_api() ->
example => 0,
description => <<"QoS">>}}}),
responses => #{
<<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:response_schema(<<"Subscribe ok">>)}}},
<<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:schema(<<"Subscribe ok">>)}}},
{"/clients/:clientid/subscribe", Metadata, subscribe}.
%%%==============================================================================================

View File

@ -18,6 +18,11 @@
-behaviour(minirest_api).
-import(emqx_mgmt_util, [ schema/1
, schema/2
, error_schema/2
]).
-export([api_spec/0]).
-export([ config/2
@ -34,15 +39,6 @@
schema => #{type => string, default => <<".">>}
}]).
-define(TEXT_BODY(DESCR, SCHEMA), #{
description => list_to_binary(DESCR),
content => #{
<<"text/plain">> => #{
schema => SCHEMA
}
}
}).
-define(PREFIX, "/configs").
-define(PREFIX_RESET, "/configs_reset").
@ -69,18 +65,16 @@ config_api(ConfPath, Schema) ->
get => #{
description => Descr("Get configs for"),
responses => #{
<<"200">> => ?TEXT_BODY("Get configs successfully", Schema),
<<"404">> => emqx_mgmt_util:response_error_schema(
<<"Config not found">>, ['NOT_FOUND'])
<<"200">> => schema(Schema, <<"Get configs successfully">>),
<<"404">> => emqx_mgmt_util:error_schema(<<"Config not found">>, ['NOT_FOUND'])
}
},
put => #{
description => Descr("Update configs for"),
'requestBody' => ?TEXT_BODY("The format of the request body is depend on the 'conf_path' parameter in the query string", Schema),
'requestBody' => schema(Schema),
responses => #{
<<"200">> => ?TEXT_BODY("Update configs successfully", Schema),
<<"400">> => emqx_mgmt_util:response_error_schema(
<<"Update configs failed">>, ['UPDATE_FAILED'])
<<"200">> => schema(Schema, <<"Update configs successfully">>),
<<"400">> => error_schema(<<"Update configs failed">>, ['UPDATE_FAILED'])
}
}
},
@ -97,9 +91,8 @@ config_reset_api() ->
%% We only return "200" rather than the new configs that has been changed, as
%% the schema of the changed configs is depends on the request parameter
%% `conf_path`, it cannot be defined here.
<<"200">> => emqx_mgmt_util:response_schema(<<"Reset configs successfully">>),
<<"400">> => emqx_mgmt_util:response_error_schema(
<<"It's not able to reset the config">>, ['INVALID_OPERATION'])
<<"200">> => schema(<<"Reset configs successfully">>),
<<"400">> => error_schema(<<"It's not able to reset the config">>, ['INVALID_OPERATION'])
}
}
},

View File

@ -27,6 +27,15 @@
, manage_listeners/2
, manage_nodes_listeners/2]).
-import(emqx_mgmt_util, [ schema/1
, schema/2
, object_schema/2
, object_array_schema/2
, error_schema/1
, error_schema/2
, properties/1
]).
-export([format/1]).
-include_lib("emqx/include/emqx.hrl").
@ -41,39 +50,20 @@ api_spec() ->
manage_listeners_api(),
manage_nodes_listeners_api()
],
[listener_schema()]
[]
}.
listener_schema() ->
#{
listener => #{
type => object,
properties => #{
node => #{
type => string,
description => <<"Node">>,
example => node()},
id => #{
type => string,
description => <<"Identifier">>},
acceptors => #{
type => integer,
description => <<"Number of Acceptor process">>},
max_conn => #{
type => integer,
description => <<"Maximum number of allowed connection">>},
type => #{
type => string,
description => <<"Listener type">>},
listen_on => #{
type => string,
description => <<"Listening port">>},
running => #{
type => boolean,
description => <<"Open or close">>},
auth => #{
type => boolean,
description => <<"Has auth">>}}}}.
properties() ->
properties([
{node, string, <<"Node">>},
{id, string, <<"Identifier">>},
{acceptors, integer, <<"Number of Acceptor process">>},
{max_conn, integer, <<"Maximum number of allowed connection">>},
{type, string, <<"Listener type">>},
{listen_on, string, <<"Listener port">>},
{running, boolean, <<"Open or close">>},
{auth, boolean, <<"Has auth">>}
]).
listeners_api() ->
Metadata = #{
@ -81,7 +71,7 @@ listeners_api() ->
description => <<"List listeners in cluster">>,
responses => #{
<<"200">> =>
emqx_mgmt_util:response_array_schema(<<"List all listeners">>, listener)}}},
object_array_schema(properties(), <<"List all listeners">>)}}},
{"/listeners", Metadata, listeners}.
listener_api() ->
@ -91,9 +81,9 @@ listener_api() ->
parameters => [param_path_id()],
responses => #{
<<"404">> =>
emqx_mgmt_util:response_error_schema(<<"Listener id not found">>, ['BAD_LISTENER_ID']),
error_schema(<<"Listener id not found">>, ['BAD_LISTENER_ID']),
<<"200">> =>
emqx_mgmt_util:response_array_schema(<<"List listener info ok">>, listener)}}},
object_array_schema(properties(), <<"List listener info ok">>)}}},
{"/listeners/:id", Metadata, listener}.
manage_listeners_api() ->
@ -105,15 +95,12 @@ manage_listeners_api() ->
param_path_operation()],
responses => #{
<<"500">> =>
emqx_mgmt_util:response_error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']),
error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']),
<<"404">> =>
emqx_mgmt_util:response_error_schema(<<"Listener id not found">>,
['BAD_LISTENER_ID']),
error_schema(<<"Listener id not found">>, ['BAD_LISTENER_ID']),
<<"400">> =>
emqx_mgmt_util:response_error_schema(<<"Listener id not found">>,
['BAD_REQUEST']),
<<"200">> =>
emqx_mgmt_util:response_schema(<<"Operation success">>)}}},
error_schema(<<"Listener id not found">>, ['BAD_REQUEST']),
<<"200">> => schema(<<"Operation success">>)}}},
{"/listeners/:id/:operation", Metadata, manage_listeners}.
manage_nodes_listeners_api() ->
@ -126,15 +113,14 @@ manage_nodes_listeners_api() ->
param_path_operation()],
responses => #{
<<"500">> =>
emqx_mgmt_util:response_error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']),
error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']),
<<"404">> =>
emqx_mgmt_util:response_error_schema(<<"Bad node or Listener id not found">>,
error_schema(<<"Bad node or Listener id not found">>,
['BAD_NODE_NAME','BAD_LISTENER_ID']),
<<"400">> =>
emqx_mgmt_util:response_error_schema(<<"Listener id not found">>,
['BAD_REQUEST']),
error_schema(<<"Listener id not found">>, ['BAD_REQUEST']),
<<"200">> =>
emqx_mgmt_util:response_schema(<<"Operation success">>)}}},
schema(<<"Operation success">>)}}},
{"/node/:node/listeners/:id/:operation", Metadata, manage_nodes_listeners}.
nodes_listeners_api() ->
@ -144,10 +130,10 @@ nodes_listeners_api() ->
parameters => [param_path_node(), param_path_id()],
responses => #{
<<"404">> =>
emqx_mgmt_util:response_error_schema(<<"Node name or listener id not found">>,
error_schema(<<"Node name or listener id not found">>,
['BAD_NODE_NAME', 'BAD_LISTENER_ID']),
<<"200">> =>
emqx_mgmt_util:response_schema(<<"Get listener info ok">>, listener)}}},
schema(properties(), <<"Get listener info ok">>)}}},
{"/nodes/:node/listeners/:id", Metadata, node_listener}.
nodes_listener_api() ->
@ -156,10 +142,8 @@ nodes_listener_api() ->
description => <<"List listeners in one node">>,
parameters => [param_path_node()],
responses => #{
<<"404">> =>
emqx_mgmt_util:response_error_schema(<<"Listener id not found">>),
<<"200">> =>
emqx_mgmt_util:response_schema(<<"Get listener info ok">>, listener)}}},
<<"404">> => error_schema(<<"Listener id not found">>),
<<"200">> => object_schema(properties(), <<"Get listener info ok">>)}}},
{"/nodes/:node/listeners", Metadata, node_listeners}.
%%%==============================================================================================
%% parameters
@ -199,27 +183,27 @@ listeners(get, _Request) ->
list().
listener(get, Request) ->
ID = binary_to_atom(cowboy_req:binding(id, Request)),
ID = b2a(cowboy_req:binding(id, Request)),
get_listeners(#{id => ID}).
node_listeners(get, Request) ->
Node = binary_to_atom(cowboy_req:binding(node, Request)),
Node = b2a(cowboy_req:binding(node, Request)),
get_listeners(#{node => Node}).
node_listener(get, Request) ->
Node = binary_to_atom(cowboy_req:binding(node, Request)),
ID = binary_to_atom(cowboy_req:binding(id, Request)),
Node = b2a(cowboy_req:binding(node, Request)),
ID = b2a(cowboy_req:binding(id, Request)),
get_listeners(#{node => Node, id => ID}).
manage_listeners(_, Request) ->
ID = binary_to_atom(cowboy_req:binding(id, Request)),
Operation = binary_to_atom(cowboy_req:binding(operation, Request)),
ID = b2a(cowboy_req:binding(id, Request)),
Operation = b2a(cowboy_req:binding(operation, Request)),
manage(Operation, #{id => ID}).
manage_nodes_listeners(_, Request) ->
Node = binary_to_atom(cowboy_req:binding(node, Request)),
ID = binary_to_atom(cowboy_req:binding(id, Request)),
Operation = binary_to_atom(cowboy_req:binding(operation, Request)),
Node = b2a(cowboy_req:binding(node, Request)),
ID = b2a(cowboy_req:binding(id, Request)),
Operation = b2a(cowboy_req:binding(operation, Request)),
manage(Operation, #{id => ID, node => Node}).
%%%==============================================================================================
@ -232,16 +216,16 @@ get_listeners(Param) ->
case list_listener(Param) of
{error, not_found} ->
ID = maps:get(id, Param),
Reason = list_to_binary(io_lib:format("Error listener id ~p", [ID])),
Reason = iolist_to_binary(io_lib:format("Error listener id ~p", [ID])),
{404, #{code => 'BAD_LISTENER_ID', message => Reason}};
{error, nodedown} ->
Node = maps:get(node, Param),
Reason = list_to_binary(io_lib:format("Node ~p rpc failed", [Node])),
Reason = iolist_to_binary(io_lib:format("Node ~p rpc failed", [Node])),
Response = #{code => 'BAD_NODE_NAME', message => Reason},
{404, Response};
[] ->
ID = maps:get(id, Param),
Reason = list_to_binary(io_lib:format("Error listener id ~p", [ID])),
Reason = iolist_to_binary(io_lib:format("Error listener id ~p", [ID])),
{404, #{code => 'BAD_LISTENER_ID', message => Reason}};
Data ->
{200, Data}
@ -253,16 +237,16 @@ manage(Operation0, Param) ->
case list_listener(Param) of
{error, not_found} ->
ID = maps:get(id, Param),
Reason = list_to_binary(io_lib:format("Error listener id ~p", [ID])),
Reason = iolist_to_binary(io_lib:format("Error listener id ~p", [ID])),
{404, #{code => 'BAD_LISTENER_ID', message => Reason}};
{error, nodedown} ->
Node = maps:get(node, Param),
Reason = list_to_binary(io_lib:format("Node ~p rpc failed", [Node])),
Reason = iolist_to_binary(io_lib:format("Node ~p rpc failed", [Node])),
Response = #{code => 'BAD_NODE_NAME', message => Reason},
{404, Response};
[] ->
ID = maps:get(id, Param),
Reason = list_to_binary(io_lib:format("Error listener id ~p", [ID])),
Reason = iolist_to_binary(io_lib:format("Error listener id ~p", [ID])),
{404, #{code => 'RESOURCE_NOT_FOUND', message => Reason}};
ListenersOrSingleListener ->
manage_(Operation, ListenersOrSingleListener)
@ -279,16 +263,16 @@ manage_(Operation, Listeners) when is_list(Listeners) ->
case lists:filter(fun({error, {already_started, _}}) -> false; (_) -> true end, Results) of
[] ->
ID = maps:get(id, hd(Listeners)),
Message = list_to_binary(io_lib:format("Already Started: ~s", [ID])),
Message = iolist_to_binary(io_lib:format("Already Started: ~s", [ID])),
{400, #{code => 'BAD_REQUEST', message => Message}};
_ ->
case lists:filter(fun({error,not_found}) -> false; (_) -> true end, Results) of
[] ->
ID = maps:get(id, hd(Listeners)),
Message = list_to_binary(io_lib:format("Already Stopped: ~s", [ID])),
Message = iolist_to_binary(io_lib:format("Already Stopped: ~s", [ID])),
{400, #{code => 'BAD_REQUEST', message => Message}};
_ ->
Reason = list_to_binary(io_lib:format("~p", [Errors])),
Reason = iolist_to_binary(io_lib:format("~p", [Errors])),
{500, #{code => 'UNKNOW_ERROR', message => Reason}}
end
end
@ -332,3 +316,6 @@ trans_running(Conf) ->
Running ->
Running
end.
b2a(B) when is_binary(B) -> binary_to_atom(B, utf8).

View File

@ -304,14 +304,7 @@ metrics_api() ->
schema => #{type => boolean}
}],
responses => #{
<<"200">> => #{
description => <<"List all metrics">>,
content => #{
'application/json' => #{
schema => minirest:ref(<<"metrics_info">>)
}
}
}
<<"200">> => emqx_mgmt_util:schema(metrics_info, <<"List all metrics">>)
}
}
},

View File

@ -17,6 +17,13 @@
-behaviour(minirest_api).
-import(emqx_mgmt_util, [ schema/2
, object_schema/2
, object_array_schema/2
, error_schema/2
, properties/1
]).
-export([api_spec/0]).
-export([ nodes/2
@ -27,7 +34,7 @@
-include_lib("emqx/include/emqx.hrl").
api_spec() ->
{apis(), schemas()}.
{apis(), []}.
apis() ->
[ nodes_api()
@ -35,125 +42,75 @@ apis() ->
, node_metrics_api()
, node_stats_api()].
schemas() ->
%% notice: node api used schema metrics and stats
%% see these schema in emqx_mgmt_api_metrics emqx_mgmt_api_status
[node_schema()].
node_schema() ->
#{
node => #{
type => object,
properties => #{
node => #{
type => string,
description => <<"Node name">>},
connections => #{
type => integer,
description => <<"Number of clients currently connected to this node">>},
load1 => #{
type => string,
description => <<"CPU average load in 1 minute">>},
load5 => #{
type => string,
description => <<"CPU average load in 5 minute">>},
load15 => #{
type => string,
description => <<"CPU average load in 15 minute">>},
max_fds => #{
type => integer,
description => <<"Maximum file descriptor limit for the operating system">>},
memory_total => #{
type => string,
description => <<"VM allocated system memory">>},
memory_used => #{
type => string,
description => <<"VM occupied system memory">>},
node_status => #{
type => string,
description => <<"Node status">>},
otp_release => #{
type => string,
description => <<"Erlang/OTP version used by EMQ X Broker">>},
process_available => #{
type => integer,
description => <<"Number of available processes">>},
process_used => #{
type => integer,
description => <<"Number of used processes">>},
uptime => #{
type => integer,
description => <<"EMQ X Broker runtime, millisecond">>},
version => #{
type => string,
description => <<"EMQ X Broker version">>},
sys_path => #{
type => string,
description => <<"EMQ X system file location">>},
log_path => #{
type => string,
description => <<"EMQ X log file location">>},
config_path => #{
type => string,
description => <<"EMQ X config file location">>}
}
}
}.
properties() ->
properties([
{node, string, <<"Node name">>},
{connections, integer, <<"Number of clients currently connected to this node">>},
{load1, string, <<"CPU average load in 1 minute">>},
{load5, string, <<"CPU average load in 5 minute">>},
{load15, string, <<"CPU average load in 15 minute">>},
{max_fds, integer, <<"Maximum file descriptor limit for the operating system">>},
{memory_total, string, <<"VM allocated system memory">>},
{memory_used, string, <<"VM occupied system memory">>},
{node_status, string, <<"Node status">>},
{otp_release, string, <<"Erlang/OTP version used by EMQ X Broker">>},
{process_available, integer, <<"Number of available processes">>},
{process_used, integer, <<"Number of used processes">>},
{uptime, integer, <<"EMQ X Broker runtime, millisecond">>},
{version, string, <<"EMQ X Broker version">>},
{sys_path, string, <<"EMQ X system file location">>},
{log_path, string, <<"EMQ X log file location">>},
{config_path, string, <<"EMQ X config file location">>}
]).
parameters() ->
[#{
name => node_name,
in => path,
description => <<"node name">>,
schema => #{type => string},
required => true,
example => node()
}].
nodes_api() ->
Metadata = #{
get => #{
description => <<"List EMQ X nodes">>,
responses => #{
<<"200">> => emqx_mgmt_util:response_array_schema(<<"List EMQ X Nodes">>, node)}}},
<<"200">> => object_array_schema(properties(), <<"List EMQ X Nodes">>)
}
}
},
{"/nodes", Metadata, nodes}.
node_api() ->
Metadata = #{
get => #{
description => <<"Get node info">>,
parameters => [#{
name => node_name,
in => path,
description => "node name",
schema => #{type => string},
required => true,
example => node()}],
parameters => parameters(),
responses => #{
<<"400">> => emqx_mgmt_util:response_error_schema(<<"Node error">>, ['SOURCE_ERROR']),
<<"200">> => emqx_mgmt_util:response_schema(<<"Get EMQ X Nodes info by name">>, node)}}},
<<"400">> => error_schema(<<"Node error">>, ['SOURCE_ERROR']),
<<"200">> => object_schema(properties(), <<"Get EMQ X Nodes info by name">>)}}},
{"/nodes/:node_name", Metadata, node}.
node_metrics_api() ->
Metadata = #{
get => #{
description => <<"Get node metrics">>,
parameters => [#{
name => node_name,
in => path,
description => "node name",
schema => #{type => string},
required => true,
example => node()}],
parameters => parameters(),
responses => #{
<<"400">> => emqx_mgmt_util:response_error_schema(<<"Node error">>, ['SOURCE_ERROR']),
<<"200">> => emqx_mgmt_util:response_schema(<<"Get EMQ X Node Metrics">>, metrics)}}},
<<"400">> => error_schema(<<"Node error">>, ['SOURCE_ERROR']),
<<"200">> => schema(metrics, <<"Get EMQ X Node Metrics">>)}}},
{"/nodes/:node_name/metrics", Metadata, node_metrics}.
node_stats_api() ->
Metadata = #{
get => #{
description => <<"Get node stats">>,
parameters => [#{
name => node_name,
in => path,
description => "node name",
schema => #{type => string},
required => true,
example => node()}],
parameters => parameters(),
responses => #{
<<"400">> => emqx_mgmt_util:response_error_schema(<<"Node error">>, ['SOURCE_ERROR']),
<<"200">> => emqx_mgmt_util:response_schema(<<"Get EMQ X Node Stats">>, stat)}}},
<<"400">> => error_schema(<<"Node error">>, ['SOURCE_ERROR']),
<<"200">> => schema(stat, <<"Get EMQ X Node Stats">>)}}},
{"/nodes/:node_name/stats", Metadata, node_stats}.
%%%==============================================================================================

View File

@ -19,88 +19,48 @@
-behaviour(minirest_api).
-import(emqx_mgmt_util, [ object_schema/1
, object_schema/2
, object_array_schema/1
, object_array_schema/2
, properties/1
]).
-export([api_spec/0]).
-export([ publish/2
, publish_batch/2]).
api_spec() ->
{
[publish_api(), publish_bulk_api()],
[message_schema()]
}.
{[publish_api(), publish_bulk_api()], []}.
publish_api() ->
Schema = #{
type => object,
properties => maps:without([id], message_properties())
},
MeteData = #{
post => #{
description => <<"Publish">>,
'requestBody' => emqx_mgmt_util:request_body_schema(Schema),
'requestBody' => object_schema(maps:without([id], properties())),
responses => #{
<<"200">> => emqx_mgmt_util:response_schema(<<"publish ok">>, message)}}},
<<"200">> => object_schema(properties(), <<"publish ok">>)}}},
{"/publish", MeteData, publish}.
publish_bulk_api() ->
Schema = #{
type => object,
properties => maps:without([id], message_properties())
},
MeteData = #{
post => #{
description => <<"publish">>,
'requestBody' => emqx_mgmt_util:request_body_array_schema(Schema),
'requestBody' => object_array_schema(maps:without([id], properties())),
responses => #{
<<"200">> => emqx_mgmt_util:response_array_schema(<<"publish ok">>, message)}}},
<<"200">> => object_array_schema(properties(), <<"publish ok">>)}}},
{"/publish/bulk", MeteData, publish_batch}.
message_schema() ->
#{
message => #{
type => object,
properties => message_properties()
}
}.
message_properties() ->
#{
id => #{
type => string,
description => <<"Message ID">>},
topic => #{
type => string,
description => <<"Topic">>},
qos => #{
type => integer,
enum => [0, 1, 2],
description => <<"Qos">>},
payload => #{
type => string,
description => <<"Topic">>},
from => #{
type => string,
description => <<"Message from">>},
flag => #{
type => <<"object">>,
description => <<"Message flag">>,
properties => #{
sys => #{
type => boolean,
default => false,
description => <<"System message flag, nullable, default false">>},
dup => #{
type => boolean,
default => false,
description => <<"Dup message flag, nullable, default false">>},
retain => #{
type => boolean,
default => false,
description => <<"Retain message flag, nullable, default false">>}
}
}
}.
properties() ->
properties([
{id, string, <<"Message Id">>},
{topic, string, <<"Topic">>},
{qos, integer, <<"QoS">>, [0, 1, 2]},
{payload, string, <<"Topic">>},
{from, string, <<"Message from">>},
{retain, boolean, <<"Retain message flag, nullable, default false">>}
]).
publish(post, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
@ -119,19 +79,8 @@ message(Map) ->
QoS = maps:get(<<"qos">>, Map, 0),
Topic = maps:get(<<"topic">>, Map),
Payload = maps:get(<<"payload">>, Map),
Flags = flags(Map),
emqx_message:make(From, QoS, Topic, Payload, Flags, #{}).
flags(Map) ->
Flags = maps:get(<<"flags">>, Map, #{}),
Retain = maps:get(<<"retain">>, Flags, false),
Sys = maps:get(<<"sys">>, Flags, false),
Dup = maps:get(<<"dup">>, Flags, false),
#{
retain => Retain,
sys => Sys,
dup => Dup
}.
Retain = maps:get(<<"retain">>, Map, false),
emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{}).
messages(List) ->
[message(MessageMap) || MessageMap <- List].
@ -144,7 +93,7 @@ format_message(#message{id = ID, qos = Qos, from = From, topic = Topic, payload
qos => Qos,
topic => Topic,
payload => Payload,
flag => Flags,
retain => maps:get(retain, Flags, false),
from => to_binary(From)
}.

View File

@ -28,43 +28,35 @@
-define(TOPIC_NOT_FOUND, 'TOPIC_NOT_FOUND').
-import(emqx_mgmt_util, [ object_schema/2
, object_array_schema/2
, error_schema/2
, properties/1
, page_params/0
]).
api_spec() ->
{
[routes_api(), route_api()],
[route_schema()]
[]
}.
route_schema() ->
#{
route => #{
type => object,
properties => #{
topic => #{
type => string},
node => #{
type => string,
example => node()}}}}.
properties() ->
properties([
{topic, string},
{node, string}
]).
routes_api() ->
Metadata = #{
get => #{
description => <<"EMQ X routes">>,
parameters => [
#{
name => page,
in => query,
description => <<"Page">>,
schema => #{type => integer, default => 1}
},
#{
name => limit,
in => query,
description => <<"Page size">>,
schema => #{type => integer, default => emqx_mgmt:max_row_limit()}
}],
parameters => page_params(),
responses => #{
<<"200">> =>
emqx_mgmt_util:response_array_schema("List route info", route)}}},
<<"200">> => object_array_schema(properties(), <<"List route info">>)
}
}
},
{"/routes", Metadata, routes}.
route_api() ->
@ -80,10 +72,12 @@ route_api() ->
}],
responses => #{
<<"200">> =>
emqx_mgmt_util:response_schema(<<"Route info">>, route),
object_schema(properties(), <<"Route info">>),
<<"404">> =>
emqx_mgmt_util:response_error_schema(<<"Topic not found">>, [?TOPIC_NOT_FOUND])
}}},
error_schema(<<"Topic not found">>, [?TOPIC_NOT_FOUND])
}
}
},
{"/routes/:topic", Metadata, route}.
%%%==============================================================================================

View File

@ -30,7 +30,10 @@ status_api() ->
get => #{
security => [],
responses => #{
<<"200">> => #{description => <<"running">>}}}},
<<"200">> => #{description => <<"running">>}
}
}
},
{Path, Metadata, running_status}.
running_status(get, _Request) ->

View File

@ -20,6 +20,11 @@
-include_lib("emqx/include/emqx.hrl").
-import(emqx_mgmt_util, [ page_schema/1
, properties/1
, page_params/0
]).
-export([api_spec/0]).
-export([subscriptions/2]).
@ -39,84 +44,67 @@
-define(format_fun, {?MODULE, format}).
api_spec() ->
{
[subscriptions_api()],
[subscription_schema()]
}.
{subscriptions_api(), subscription_schema()}.
subscriptions_api() ->
MetaData = #{
get => #{
description => <<"List subscriptions">>,
parameters => [
#{
name => page,
in => query,
description => <<"Page">>,
schema => #{type => integer}
},
#{
name => limit,
in => query,
description => <<"Page size">>,
schema => #{type => integer}
},
#{
name => clientid,
in => query,
description => <<"Client ID">>,
schema => #{type => string}
},
#{
name => node,
in => query,
description => <<"Node name">>,
schema => #{type => string}
},
#{
name => qos,
in => query,
description => <<"QoS">>,
schema => #{type => integer, enum => [0, 1, 2]}
},
#{
name => share,
in => query,
description => <<"Shared subscription">>,
schema => #{type => boolean}
},
#{
name => topic,
in => query,
description => <<"Topic">>,
schema => #{type => string}
}
#{
name => match_topic,
in => query,
description => <<"Match topic string">>,
schema => #{type => string}
}
],
parameters => parameters(),
responses => #{
<<"200">> => emqx_mgmt_util:response_page_schema(subscription)}}},
{"/subscriptions", MetaData, subscriptions}.
<<"200">> => page_schema(subscription)
}
}
},
[{"/subscriptions", MetaData, subscriptions}].
subscription_schema() ->
#{
subscription => #{
type => object,
properties => #{
node => #{
type => string},
topic => #{
type => string},
clientid => #{
type => string},
qos => #{
type => integer,
enum => [0,1,2]}}}
}.
Props = properties([
{node, string},
{topic, string},
{clientid, string},
{qos, integer, <<>>, [0,1,2]}]),
[#{subscription => #{type => object, properties => Props}}].
parameters() ->
[
#{
name => clientid,
in => query,
description => <<"Client ID">>,
schema => #{type => string}
},
#{
name => node,
in => query,
description => <<"Node name">>,
schema => #{type => string}
},
#{
name => qos,
in => query,
description => <<"QoS">>,
schema => #{type => integer, enum => [0, 1, 2]}
},
#{
name => share,
in => query,
description => <<"Shared subscription">>,
schema => #{type => boolean}
},
#{
name => topic,
in => query,
description => <<"Topic">>,
schema => #{type => string}
}
#{
name => match_topic,
in => query,
description => <<"Match topic string">>,
schema => #{type => string}
} | page_params()
].
subscriptions(get, Request) ->
Params = cowboy_req:parse_qs(Request),

View File

@ -133,3 +133,4 @@ api_modules() ->
api_modules() ->
minirest_api:find_api_modules(apps()) -- [emqx_mgmt_api_apps].
-endif.

View File

@ -24,15 +24,26 @@
, batch_operation/3
]).
-export([ request_body_schema/1
, request_body_array_schema/1
, response_schema/1
, response_schema/2
, response_array_schema/2
, response_error_schema/1
, response_error_schema/2
, response_page_schema/1
, response_batch_schema/1]).
-export([ bad_request/0
, bad_request/1
, properties/1
, page_params/0
, schema/1
, schema/2
, object_schema/1
, object_schema/2
, array_schema/1
, array_schema/2
, object_array_schema/1
, object_array_schema/2
, page_schema/1
, page_object_schema/1
, error_schema/1
, error_schema/2
, batch_schema/1
]).
-export([urldecode/1]).
@ -90,78 +101,69 @@ urldecode(S) ->
%%%==============================================================================================
%% schema util
schema(Ref) when is_atom(Ref) ->
json_content_schema(minirest:ref(atom_to_binary(Ref, utf8)));
schema(SchemaOrDesc) ->
json_content_schema(SchemaOrDesc).
schema(Ref, Desc) when is_atom(Ref) ->
json_content_schema(minirest:ref(atom_to_binary(Ref, utf8)), Desc);
schema(Schema, Desc) ->
json_content_schema(Schema, Desc).
request_body_array_schema(Schema) when is_map(Schema) ->
json_content_schema("", #{type => array, items => Schema});
request_body_array_schema(Ref) when is_atom(Ref) ->
request_body_array_schema(atom_to_binary(Ref, utf8));
request_body_array_schema(Ref) when is_binary(Ref) ->
json_content_schema("", #{type => array, items => minirest:ref(Ref)}).
object_schema(Properties) when is_map(Properties) ->
json_content_schema(#{type => object, properties => Properties}).
object_schema(Properties, Desc) when is_map(Properties) ->
json_content_schema(#{type => object, properties => Properties}, Desc).
request_body_schema(Schema) when is_map(Schema) ->
json_content_schema("", Schema);
request_body_schema(Ref) when is_atom(Ref) ->
request_body_schema(atom_to_binary(Ref));
request_body_schema(Ref) when is_binary(Ref) ->
json_content_schema("", minirest:ref(Ref)).
array_schema(Ref) when is_atom(Ref) ->
json_content_schema(#{type => array, items => minirest:ref(atom_to_binary(Ref, utf8))}).
array_schema(Ref, Desc) when is_atom(Ref) ->
json_content_schema(#{type => array, items => minirest:ref(atom_to_binary(Ref, utf8))}, Desc);
array_schema(Schema, Desc) ->
json_content_schema(#{type => array, items => Schema}, Desc).
response_array_schema(Description, Schema) when is_map(Schema) ->
json_content_schema(Description, #{type => array, items => Schema});
response_array_schema(Description, Ref) when is_atom(Ref) ->
response_array_schema(Description, atom_to_binary(Ref, utf8));
response_array_schema(Description, Ref) when is_binary(Ref) ->
json_content_schema(Description, #{type => array, items => minirest:ref(Ref)}).
object_array_schema(Properties) when is_map(Properties) ->
json_content_schema(#{type => array, items => #{type => object, properties => Properties}}).
object_array_schema(Properties, Desc) ->
json_content_schema(#{type => array, items => #{type => object, properties => Properties}}, Desc).
response_schema(Description) ->
json_content_schema(Description).
response_schema(Description, Schema) when is_map(Schema) ->
json_content_schema(Description, Schema);
response_schema(Description, Ref) when is_atom(Ref) ->
response_schema(Description, atom_to_binary(Ref, utf8));
response_schema(Description, Ref) when is_binary(Ref) ->
json_content_schema(Description, minirest:ref(Ref)).
%% @doc default code is RESOURCE_NOT_FOUND
response_error_schema(Description) ->
response_error_schema(Description, ['RESOURCE_NOT_FOUND']).
response_error_schema(Description, Enum) ->
Schema = #{
type => object,
properties => #{
code => #{
type => string,
enum => Enum},
message => #{
type => string}}},
json_content_schema(Description, Schema).
response_page_schema(Def) when is_atom(Def) ->
response_page_schema(atom_to_binary(Def, utf8));
response_page_schema(Def) when is_binary(Def) ->
response_page_schema(minirest:ref(Def));
response_page_schema(ItemSchema) when is_map(ItemSchema) ->
Schema = #{
page_schema(Ref) when is_atom(Ref) ->
page_schema(minirest:ref(atom_to_binary(Ref, utf8)));
page_schema(Schema) ->
Schema1 = #{
type => object,
properties => #{
meta => #{
type => object,
properties => #{
page => #{
type => integer},
limit => #{
type => integer},
count => #{
type => integer}}},
properties => properties([{page, integer},
{limit, integer},
{count, integer}])
},
data => #{
type => array,
items => ItemSchema}}},
json_content_schema("", Schema).
items => Schema
}
}
},
json_content_schema(Schema1).
response_batch_schema(DefName) when is_atom(DefName) ->
response_batch_schema(atom_to_binary(DefName, utf8));
response_batch_schema(DefName) when is_binary(DefName) ->
page_object_schema(Properties) when is_map(Properties) ->
page_schema(#{type => object, properties => Properties}).
error_schema(Description) ->
error_schema(Description, ['RESOURCE_NOT_FOUND']).
error_schema(Description, Enum) ->
Schema = #{
type => object,
properties => properties([{code, string, <<>>, Enum},
{message, string}])
},
json_content_schema(Schema, Description).
batch_schema(DefName) when is_atom(DefName) ->
batch_schema(atom_to_binary(DefName, utf8));
batch_schema(DefName) when is_binary(DefName) ->
Schema = #{
type => object,
properties => #{
@ -181,22 +183,17 @@ response_batch_schema(DefName) when is_binary(DefName) ->
data => minirest:ref(DefName),
reason => #{
type => <<"string">>}}}}}},
json_content_schema("", Schema).
json_content_schema(Schema).
json_content_schema(Description, Schema) ->
Content =
#{content => #{
'application/json' => #{
schema => Schema}}},
case Description of
"" ->
Content;
_ ->
maps:merge(#{description => Description}, Content)
end.
json_content_schema(Description) ->
#{description => Description}.
json_content_schema(Schema) when is_map(Schema) ->
#{content => #{'application/json' => #{schema => Schema}}};
json_content_schema(Desc) when is_binary(Desc) ->
#{description => Desc}.
json_content_schema(Schema, Desc) ->
#{
content => #{'application/json' => #{schema => Schema}},
description => Desc
}.
%%%==============================================================================================
batch_operation(Module, Function, ArgsList) ->
@ -215,3 +212,44 @@ batch_operation(Module, Function, [Args | ArgsList], Failed) ->
{error ,Reason} ->
batch_operation(Module, Function, ArgsList, [{Args, Reason} | Failed])
end.
properties(Props) ->
properties(Props, #{}).
properties([], Acc) ->
Acc;
properties([Key| Props], Acc) when is_atom(Key) ->
properties(Props, maps:put(Key, #{type => string}, Acc));
properties([{Key, Type} | Props], Acc) ->
properties(Props, maps:put(Key, #{type => Type}, Acc));
properties([{Key, object, Props1} | Props], Acc) ->
properties(Props, maps:put(Key, #{type => object,
properties => properties(Props1)}, Acc));
properties([{Key, {array, Type}, Desc} | Props], Acc) ->
properties(Props, maps:put(Key, #{type => array,
items => #{type => Type},
description => Desc}, Acc));
properties([{Key, Type, Desc} | Props], Acc) ->
properties(Props, maps:put(Key, #{type => Type, description => Desc}, Acc));
properties([{Key, Type, Desc, Enum} | Props], Acc) ->
properties(Props, maps:put(Key, #{type => Type,
description => Desc,
emum => Enum}, Acc)).
page_params() ->
[#{
name => page,
in => query,
description => <<"Page">>,
schema => #{type => integer, default => 1}
},
#{
name => limit,
in => query,
description => <<"Page size">>,
schema => #{type => integer, default => emqx_mgmt:max_row_limit()}
}].
bad_request() ->
bad_request(<<"Bad Request">>).
bad_request(Desc) ->
object_schema(properties([{message, string}, {code, string}]), Desc).

View File

@ -18,11 +18,12 @@
-behavior(minirest_api).
-import(emqx_mgmt_util, [ response_schema/1
, response_schema/2
, response_error_schema/2
, response_page_schema/1
, request_body_schema/1
-import(emqx_mgmt_util, [ schema/1
, schema/2
, object_schema/2
, error_schema/2
, page_object_schema/1
, properties/1
]).
-define(MAX_PAYLOAD_LENGTH, 2048).
@ -48,77 +49,50 @@
api_spec() ->
{
[status_api(), delayed_messages_api(), delayed_message_api()],
[]
schemas()
}.
delayed_schema() ->
delayed_schema(false).
schemas() ->
[#{delayed => emqx_mgmt_api_configs:gen_schema(emqx:get_raw_config([delayed]))}].
properties() ->
PayloadDesc = io_lib:format("Payload, base64 encode. Payload will be ~p if length large than ~p",
[?PAYLOAD_TOO_LARGE, ?MAX_PAYLOAD_LENGTH]),
properties([
{id, integer, <<"Message Id (MQTT message id hash)">>},
{publish_time, string, <<"publish time, rfc 3339">>},
{topic, string, <<"Topic">>},
{qos, string, <<"QoS">>},
{payload, string, iolist_to_binary(PayloadDesc)},
{form_clientid, string, <<"Form ClientId">>},
{form_username, string, <<"Form Username">>}
]).
delayed_schema(WithPayload) ->
case WithPayload of
true ->
#{
type => object,
properties => delayed_message_properties()
};
_ ->
#{
type => object,
properties => maps:without([payload], delayed_message_properties())
}
end.
delayed_message_properties() ->
PayloadDesc = list_to_binary(
io_lib:format("Payload, base64 encode. Payload will be ~p if length large than ~p",
[?PAYLOAD_TOO_LARGE, ?MAX_PAYLOAD_LENGTH])),
#{
id => #{
type => integer,
description => <<"Message Id (MQTT message id hash)">>},
publish_time => #{
type => string,
description => <<"publish time, rfc 3339">>},
topic => #{
type => string,
description => <<"Topic">>},
qos => #{
type => integer,
enum => [0, 1, 2],
description => <<"Qos">>},
payload => #{
type => string,
description => PayloadDesc},
form_clientid => #{
type => string,
description => <<"Client ID">>},
form_username => #{
type => string,
description => <<"Username">>}
}.
parameters() ->
[#{
name => id,
in => path,
schema => #{type => string},
required => true
}].
status_api() ->
Schema = #{
type => object,
properties => #{
enable => #{
type => boolean},
max_delayed_messages => #{
type => integer,
description => <<"Max limit, 0 is no limit">>}}},
Metadata = #{
get => #{
description => "Get delayed status",
description => <<"Get delayed status">>,
responses => #{
<<"200">> => response_schema(<<"Bad Request">>, Schema)}},
<<"200">> => schema(delayed)}
},
put => #{
description => "Enable or disable delayed, set max delayed messages",
'requestBody' => request_body_schema(Schema),
description => <<"Enable or disable delayed, set max delayed messages">>,
'requestBody' => schema(delayed),
responses => #{
<<"200">> =>
response_schema(<<"Enable or disable delayed successfully">>, Schema),
schema(delayed, <<"Enable or disable delayed successfully">>),
<<"400">> =>
response_error_schema(<<"Already disabled or enabled">>, [?ALREADY_ENABLED, ?ALREADY_DISABLED])}}},
error_schema(<<"Already disabled or enabled">>, [?ALREADY_ENABLED, ?ALREADY_DISABLED])
}
}
},
{"/mqtt/delayed_messages/status", Metadata, status}.
delayed_messages_api() ->
@ -126,32 +100,30 @@ delayed_messages_api() ->
get => #{
description => "List delayed messages",
responses => #{
<<"200">> => response_page_schema(delayed_schema())}}},
<<"200">> => page_object_schema(properties())
}
}
},
{"/mqtt/delayed_messages", Metadata, delayed_messages}.
delayed_message_api() ->
Metadata = #{
get => #{
description => "Get delayed message",
parameters => [#{
name => id,
in => path,
schema => #{type => string},
required => true
}],
description => <<"Get delayed message">>,
parameters => parameters(),
responses => #{
<<"200">> => response_schema(<<"Get delayed message success">>, delayed_schema(true)),
<<"404">> => response_error_schema(<<"Message ID not found">>, [?MESSAGE_ID_NOT_FOUND])}},
<<"200">> => object_schema(maps:without([payload], properties()), <<"Get delayed message success">>),
<<"404">> => error_schema(<<"Message ID not found">>, [?MESSAGE_ID_NOT_FOUND])
}
},
delete => #{
description => "Delete delayed message",
parameters => [#{
name => id,
in => path,
schema => #{type => string},
required => true
}],
description => <<"Delete delayed message">>,
parameters => parameters(),
responses => #{
<<"200">> => response_schema(<<"Delete delayed message success">>)}}},
<<"200">> => schema(<<"Delete delayed message success">>)
}
}
},
{"/mqtt/delayed_messages/:id", Metadata, delayed_message}.
%%--------------------------------------------------------------------
@ -181,7 +153,7 @@ delayed_message(get, Request) ->
{200, Message#{payload => base64:encode(Payload)}}
end;
{error, not_found} ->
Message = list_to_binary(io_lib:format("Message ID ~p not found", [Id])),
Message = iolist_to_binary(io_lib:format("Message ID ~p not found", [Id])),
{404, #{code => ?MESSAGE_ID_NOT_FOUND, message => Message}}
end;
delayed_message(delete, Request) ->

View File

@ -21,51 +21,15 @@
-export([event_message/2]).
-import(emqx_mgmt_util, [ schema/1
]).
api_spec() ->
{[event_message_api()], [event_message_schema()]}.
event_message_schema() ->
#{
type => object,
properties => #{
'$event/client_connected' => #{
type => boolean,
description => <<"Client connected event">>,
example => get_raw(<<"$event/client_connected">>)
},
'$event/client_disconnected' => #{
type => boolean,
description => <<"client_disconnected">>,
example => get_raw(<<"Client disconnected event">>)
},
'$event/client_subscribed' => #{
type => boolean,
description => <<"client_subscribed">>,
example => get_raw(<<"Client subscribed event">>)
},
'$event/client_unsubscribed' => #{
type => boolean,
description => <<"client_unsubscribed">>,
example => get_raw(<<"Client unsubscribed event">>)
},
'$event/message_delivered' => #{
type => boolean,
description => <<"message_delivered">>,
example => get_raw(<<"Message delivered event">>)
},
'$event/message_acked' => #{
type => boolean,
description => <<"message_acked">>,
example => get_raw(<<"Message acked event">>)
},
'$event/message_dropped' => #{
type => boolean,
description => <<"message_dropped">>,
example => get_raw(<<"Message dropped event">>)
}
}
}.
Conf = emqx:get_raw_config([event_message]),
#{event_message => emqx_mgmt_api_configs:gen_schema(Conf)}.
event_message_api() ->
Path = "/mqtt/event_message",
@ -73,14 +37,14 @@ event_message_api() ->
get => #{
description => <<"Event Message">>,
responses => #{
<<"200">> =>
emqx_mgmt_util:response_schema(<<>>, event_message_schema())}},
<<"200">> => schema(event_message)
}
},
post => #{
description => <<"">>,
'requestBody' => emqx_mgmt_util:request_body_schema(event_message_schema()),
description => <<"Update Event Message">>,
'requestBody' => schema(event_message),
responses => #{
<<"200">> =>
emqx_mgmt_util:response_schema(<<>>, event_message_schema())
<<"200">> => schema(event_message)
}
}
},
@ -94,6 +58,3 @@ event_message(post, Request) ->
Params = emqx_json:decode(Body, [return_maps]),
_ = emqx_event_message:update(Params),
{200, emqx_event_message:list()}.
get_raw(Key) ->
emqx_config:get_raw([<<"event_message">>] ++ [Key], false).

View File

@ -25,28 +25,20 @@
-define(EXCEED_LIMIT, 'EXCEED_LIMIT').
-import(emqx_mgmt_util, [ object_array_schema/1
, object_array_schema/2
, error_schema/2
, properties/1
]).
api_spec() ->
{[rewrite_api()], []}.
topic_rewrite_schema() ->
#{
type => object,
properties => #{
action => #{
type => string,
description => <<"Node">>,
enum => [subscribe, publish]},
source_topic => #{
type => string,
description => <<"Topic">>},
re => #{
type => string,
description => <<"Regular expressions">>},
dest_topic => #{
type => string,
description => <<"Destination topic">>}
}
}.
properties() ->
properties([{action, string, <<"Node">>, [subscribe, publish]},
{source_topic, string, <<"Topic">>},
{re, string, <<"Regular expressions">>},
{dest_topic, string, <<"Destination topic">>}]).
rewrite_api() ->
Path = "/mqtt/topic_rewrite",
@ -54,15 +46,18 @@ rewrite_api() ->
get => #{
description => <<"List topic rewrite">>,
responses => #{
<<"200">> =>
emqx_mgmt_util:response_array_schema(<<"List all rewrite rules">>, topic_rewrite_schema())}},
<<"200">> => object_array_schema(properties(), <<"List all rewrite rules">>)
}
},
post => #{
description => <<"Update topic rewrite">>,
'requestBody' => emqx_mgmt_util:request_body_array_schema(topic_rewrite_schema()),
'requestBody' => object_array_schema(properties()),
responses => #{
<<"200">> =>
emqx_mgmt_util:response_schema(<<"Update topic rewrite success">>, topic_rewrite_schema()),
<<"413">> => emqx_mgmt_util:response_error_schema(<<"Rules count exceed max limit">>, [?EXCEED_LIMIT])}}},
<<"200">> =>object_array_schema(properties(), <<"Update topic rewrite success">>),
<<"413">> => error_schema(<<"Rules count exceed max limit">>, [?EXCEED_LIMIT])
}
}
},
{Path, Metadata, topic_rewrite}.
topic_rewrite(get, _Request) ->
@ -76,6 +71,6 @@ topic_rewrite(post, Request) ->
ok = emqx_rewrite:update(Params),
{200, emqx_rewrite:list()};
_ ->
Message = list_to_binary(io_lib:format("Max rewrite rules count is ~p", [?MAX_RULES_LIMIT])),
Message = iolist_to_binary(io_lib:format("Max rewrite rules count is ~p", [?MAX_RULES_LIMIT])),
{413, #{code => ?EXCEED_LIMIT, message => Message}}
end.

View File

@ -18,9 +18,11 @@
-behavior(minirest_api).
-import(emqx_mgmt_util, [ response_schema/1
, response_schema/2
, request_body_schema/1
-import(emqx_mgmt_util, [ schema/1
, object_schema/1
, object_schema/2
, properties/1
, bad_request/0
]).
% -export([cli/1]).
@ -34,96 +36,38 @@
-export([api_spec/0]).
api_spec() ->
{[status_api(), data_api()], schemas()}.
{[status_api(), data_api()], []}.
schemas() ->
[#{broker_info => #{
type => object,
properties => #{
emqx_version => #{
type => string,
description => <<"EMQ X Version">>},
license => #{
type => object,
properties => #{
edition => #{type => string}
},
description => <<"EMQ X License">>},
os_name => #{
type => string,
description => <<"OS Name">>},
os_version => #{
type => string,
description => <<"OS Version">>},
otp_version => #{
type => string,
description => <<"Erlang/OTP Version">>},
up_time => #{
type => integer,
description => <<"EMQ X Runtime">>},
uuid => #{
type => string,
description => <<"EMQ X UUID">>},
nodes_uuid => #{
type => array,
items => #{type => string},
description => <<"EMQ X Cluster Nodes UUID">>},
active_plugins => #{
type => array,
items => #{type => string},
description => <<"EMQ X Active Plugins">>},
active_modules => #{
type => array,
items => #{type => string},
description => <<"EMQ X Active Modules">>},
num_clients => #{
type => integer,
description => <<"EMQ X Current Connections">>},
messages_received => #{
type => integer,
description => <<"EMQ X Current Received Message">>},
messages_sent => #{
type => integer,
description => <<"EMQ X Current Sent Message">>}
}
}}].
properties() ->
properties([
{emqx_version, string, <<"EMQ X Version">>},
{license, object, [{edition, string, <<"EMQ X License">>}]},
{os_name, string, <<"OS Name">>},
{os_version, string, <<"OS Version">>},
{otp_version, string, <<"Erlang/OTP Version">>},
{up_time, string, <<"EMQ X Runtime">>},
{uuid, string, <<"EMQ X UUID">>},
{nodes_uuid, string, <<"EMQ X Cluster Nodes UUID">>},
{active_plugins, {array, string}, <<"EMQ X Active Plugins">>},
{active_modules, {array, string}, <<"EMQ X Active Modules">>},
{num_clients, integer, <<"EMQ X Current Connections">>},
{messages_received, integer, <<"EMQ X Current Received Message">>},
{messages_sent, integer, <<"EMQ X Current Sent Message">>}
]).
status_api() ->
Props = properties([{enable, boolean}]),
Metadata = #{
get => #{
description => "Get telemetry status",
responses => #{
<<"200">> => response_schema(<<"Bad Request">>,
#{
type => object,
properties => #{enable => #{type => boolean}}
}
)
}
responses => #{<<"200">> => object_schema(Props)}
},
put => #{
description => "Enable or disbale telemetry",
'requestBody' => request_body_schema(#{
type => object,
properties => #{
enable => #{
type => boolean
}
}
}),
'requestBody' => object_schema(Props),
responses => #{
<<"200">> =>
response_schema(<<"Enable or disbale telemetry successfully">>),
<<"400">> =>
response_schema(<<"Bad Request">>,
#{
type => object,
properties => #{
message => #{type => string},
code => #{type => string}
}
}
)
<<"200">> => schema(<<"Enable or disbale telemetry successfully">>),
<<"400">> => bad_request()
}
}
},
@ -133,7 +77,7 @@ data_api() ->
Metadata = #{
get => #{
responses => #{
<<"200">> => response_schema(<<"Get telemetry data">>, <<"broker_info">>)
<<"200">> => object_schema(properties(), <<"Get telemetry data">>)
}
}
},

View File

@ -18,11 +18,11 @@
-behavior(minirest_api).
-import(emqx_mgmt_util, [ request_body_schema/1
, response_schema/1
, response_schema/2
, response_array_schema/2
, response_error_schema/2
-import(emqx_mgmt_util, [ properties/1
, schema/1
, object_schema/2
, object_array_schema/2
, error_schema/2
]).
-export([api_spec/0]).
@ -49,113 +49,88 @@ api_spec() ->
reset_all_topic_metrics_api(),
reset_topic_metrics_api()
],
[
topic_metrics_schema()
]
[]
}.
topic_metrics_schema() ->
#{
topic_metrics => #{
type => object,
properties => #{
topic => #{type => string},
create_time => #{
type => string,
description => <<"Date time, rfc3339">>
},
reset_time => #{
type => string,
description => <<"Nullable. Date time, rfc3339.">>
},
metrics => #{
type => object,
properties => #{
'messages.dropped.count' => #{type => integer},
'messages.dropped.rate' => #{type => number},
'messages.in.count' => #{type => integer},
'messages.in.rate' => #{type => number},
'messages.out.count' => #{type => integer},
'messages.out.rate' => #{type => number},
'messages.qos0.in.count' => #{type => integer},
'messages.qos0.in.rate' => #{type => number},
'messages.qos0.out.count' => #{type => integer},
'messages.qos0.out.rate' => #{type => number},
'messages.qos1.in.count' => #{type => integer},
'messages.qos1.in.rate' => #{type => number},
'messages.qos1.out.count' => #{type => integer},
'messages.qos1.out.rate' => #{type => number},
'messages.qos2.in.count' => #{type => integer},
'messages.qos2.in.rate' => #{type => number},
'messages.qos2.out.count' => #{type => integer},
'messages.qos2.out.rate' => #{type => number}
}
}
}
}
}.
properties() ->
properties([
{topic, string},
{create_time, string, <<"Date time, rfc3339">>},
{reset_time, string, <<"Nullable. Date time, rfc3339.">>},
{metrics, object, [{'messages.dropped.count', integer},
{'messages.dropped.rate', number},
{'messages.in.count', integer},
{'messages.in.rate', number},
{'messages.out.count', integer},
{'messages.out.rate', number},
{'messages.qos0.in.count', integer},
{'messages.qos0.in.rate', number},
{'messages.qos0.out.count', integer},
{'messages.qos0.out.rate', number},
{'messages.qos1.in.count', integer},
{'messages.qos1.in.rate', number},
{'messages.qos1.out.count', integer},
{'messages.qos1.out.rate', number},
{'messages.qos2.in.count', integer},
{'messages.qos2.in.rate', number},
{'messages.qos2.out.count', integer},
{'messages.qos2.out.rate', number}]}
]).
list_topic_api() ->
Path = "/mqtt/topic_metrics",
TopicSchema = #{
type => object,
properties => #{
topic => #{
type => string}}},
Props = properties([{topic, string}]),
MetaData = #{
get => #{
description => <<"List topic">>,
responses => #{
<<"200">> =>
response_array_schema(<<"List topic">>, TopicSchema)}}},
{Path, MetaData, list_topic}.
responses => #{<<"200">> => object_array_schema(Props, <<"List topic">>)}
}
},
{"/mqtt/topic_metrics", MetaData, list_topic}.
list_topic_metrics_api() ->
Path = "/mqtt/topic_metrics/metrics",
MetaData = #{
get => #{
description => <<"List topic metrics">>,
responses => #{
<<"200">> =>
response_array_schema(<<"List topic metrics">>, topic_metrics)}}},
{Path, MetaData, list_topic_metrics}.
<<"200">> => object_array_schema(properties(), <<"List topic metrics">>)
}
}
},
{"/mqtt/topic_metrics/metrics", MetaData, list_topic_metrics}.
get_topic_metrics_api() ->
Path = "/mqtt/topic_metrics/metrics/:topic",
MetaData = #{
get => #{
description => <<"List topic metrics">>,
parameters => [topic_param()],
responses => #{
<<"200">> =>
response_schema(<<"List topic metrics">>, topic_metrics)}},
<<"200">> => object_schema(properties(), <<"List topic metrics">>)}},
put => #{
description => <<"Register topic metrics">>,
parameters => [topic_param()],
responses => #{
<<"200">> =>
response_schema(<<"Register topic metrics">>),
<<"409">> =>
response_error_schema(<<"Topic metrics max limit">>, [?EXCEED_LIMIT]),
<<"400">> =>
response_error_schema(<<"Topic metrics already exist">>, [?BAD_REQUEST])}},
<<"200">> => schema(<<"Register topic metrics">>),
<<"409">> => error_schema(<<"Topic metrics max limit">>, [?EXCEED_LIMIT]),
<<"400">> => error_schema(<<"Topic metrics already exist">>, [?BAD_REQUEST])
}
},
delete => #{
description => <<"Deregister topic metrics">>,
parameters => [topic_param()],
responses => #{
<<"200">> =>
response_schema(<<"Deregister topic metrics">>)}}},
{Path, MetaData, operate_topic_metrics}.
responses => #{ <<"200">> => schema(<<"Deregister topic metrics">>)}
}
},
{"/mqtt/topic_metrics/metrics/:topic", MetaData, operate_topic_metrics}.
reset_all_topic_metrics_api() ->
Path = "/mqtt/topic_metrics/reset",
MetaData = #{
put => #{
description => <<"Reset all topic metrics">>,
responses => #{
<<"200">> =>
response_schema(<<"Reset all topic metrics">>)}}},
{Path, MetaData, reset_all_topic_metrics}.
responses => #{<<"200">> => schema(<<"Reset all topic metrics">>)}
}
},
{"/mqtt/topic_metrics/reset", MetaData, reset_all_topic_metrics}.
reset_topic_metrics_api() ->
Path = "/mqtt/topic_metrics/reset/:topic",
@ -163,9 +138,9 @@ reset_topic_metrics_api() ->
put => #{
description => <<"Reset topic metrics">>,
parameters => [topic_param()],
responses => #{
<<"200">> =>
response_schema(<<"Reset topic metrics">>)}}},
responses => #{<<"200">> => schema(<<"Reset topic metrics">>)}
}
},
{Path, MetaData, reset_topic_metrics}.
topic_param() ->

View File

@ -20,9 +20,8 @@
-include("emqx_prometheus.hrl").
-import(emqx_mgmt_util, [ response_schema/2
, request_body_schema/1
]).
-import(emqx_mgmt_util, [ schema/1
, bad_request/0]).
-export([api_spec/0]).
@ -40,23 +39,14 @@ prometheus_api() ->
Metadata = #{
get => #{
description => <<"Get Prometheus info">>,
responses => #{
<<"200">> => response_schema(<<>>, prometheus)
}
responses => #{<<"200">> => schema(prometheus)}
},
put => #{
description => <<"Update Prometheus">>,
'requestBody' => request_body_schema(prometheus),
'requestBody' => schema(prometheus),
responses => #{
<<"200">> =>response_schema(<<>>, prometheus),
<<"400">> =>
response_schema(<<"Bad Request">>, #{
type => object,
properties => #{
message => #{type => string},
code => #{type => string}
}
})
<<"200">> => schema(prometheus),
<<"400">> => bad_request()
}
}
},

View File

@ -187,8 +187,8 @@ init([]) ->
end}.
handle_call({update_config, Conf}, _, State) ->
State2 = update_config(State, Conf),
emqx_config:put([?APP], Conf),
{ok, Config} = emqx:update_config([?APP], Conf),
State2 = update_config(State, maps:get(config, Config)),
{reply, ok, State2};
handle_call({wait_semaphore, Id}, From, #{wait_quotas := Waits} = State) ->

View File

@ -27,14 +27,12 @@
, config/2]).
-import(emqx_mgmt_api_configs, [gen_schema/1]).
-import(emqx_mgmt_util, [ response_array_schema/2
, response_schema/1
, response_error_schema/2]).
-define(CFG_BODY(DESCR),
#{description => list_to_binary(DESCR),
content => #{<<"application/json">> =>
#{schema => gen_schema(emqx_config:get([emqx_retainer]))}}}).
-import(emqx_mgmt_util, [ object_array_schema/2
, schema/1
, schema/2
, error_schema/2
, page_params/0
, properties/1]).
api_spec() ->
{
@ -42,76 +40,86 @@ api_spec() ->
, with_topic_api()
, config_api()
],
[ message_schema(message, fun message_properties/0)
, message_schema(detail_message, fun detail_message_properties/0)
]
schemas()
}.
schemas() ->
MqttRetainer = gen_schema(emqx:get_raw_config([emqx_retainer])),
[#{emqx_retainer => MqttRetainer}].
message_props() ->
properties([
{id, string, <<"Message ID">>},
{topic, string, <<"MQTT Topic">>},
{qos, string, <<"MQTT QoS">>},
{payload, string, <<"MQTT Payload">>},
{publish_at, string, <<"publish datetime">>},
{from_clientid, string, <<"publisher ClientId">>},
{from_username, string, <<"publisher Username">>}
]).
parameters() ->
[#{
name => topic,
in => path,
required => true,
schema => #{type => "string"}
}].
lookup_retained_api() ->
Metadata =
#{get => #{description => <<"lookup matching messages">>,
parameters => [ #{name => page,
in => query,
description => <<"Page">>,
schema => #{type => integer, default => 1}}
, #{name => limit,
in => query,
description => <<"Page size">>,
schema => #{type => integer,
default => emqx_mgmt:max_row_limit()}}
],
responses => #{ <<"200">> =>
response_array_schema("List retained messages", message)
, <<"405">> => response_schema(<<"NotAllowed">>)
}}},
Metadata = #{
get => #{
description => <<"lookup matching messages">>,
parameters => page_params(),
responses => #{
<<"200">> => object_array_schema(
maps:without([payload], message_props()),
<<"List retained messages">>),
<<"405">> => schema(<<"NotAllowed">>)
}
}
},
{"/mqtt/retainer/messages", Metadata, lookup_retained_warp}.
with_topic_api() ->
MetaData = #{get => #{description => <<"lookup matching messages">>,
parameters => [ #{name => topic,
in => path,
required => true,
schema => #{type => "string"}}
, #{name => page,
in => query,
description => <<"Page">>,
schema => #{type => integer, default => 1}}
, #{name => limit,
in => query,
description => <<"Page size">>,
schema => #{type => integer,
default => emqx_mgmt:max_row_limit()}}
],
responses => #{ <<"200">> =>
response_array_schema("List retained messages", detail_message)
, <<"405">> => response_schema(<<"NotAllowed">>)}},
delete => #{description => <<"delete matching messages">>,
parameters => [#{name => topic,
in => path,
required => true,
schema => #{type => "string"}}],
responses => #{ <<"200">> => response_schema(<<"Successed">>)
, <<"405">> => response_schema(<<"NotAllowed">>)}}
},
MetaData = #{
get => #{
description => <<"lookup matching messages">>,
parameters => parameters() ++ page_params(),
responses => #{
<<"200">> => object_array_schema(message_props(), <<"List retained messages">>),
<<"405">> => schema(<<"NotAllowed">>)
}
},
delete => #{
description => <<"delete matching messages">>,
parameters => parameters(),
responses => #{
<<"200">> => schema(<<"Successed">>),
<<"405">> => schema(<<"NotAllowed">>)
}
}
},
{"/mqtt/retainer/message/:topic", MetaData, with_topic_warp}.
config_api() ->
MetaData = #{
get => #{
description => <<"get retainer config">>,
responses => #{<<"200">> => ?CFG_BODY("Get configs successfully"),
<<"404">> => response_error_schema(
<<"Config not found">>, ['NOT_FOUND'])}
},
put => #{
description => <<"Update retainer config">>,
'requestBody' =>
?CFG_BODY("The format of the request body is depend on the 'conf_path' parameter in the query string"),
responses => #{<<"200">> => response_schema("Update configs successfully"),
<<"400">> => response_error_schema(
<<"Update configs failed">>, ['UPDATE_FAILED'])}
}
},
get => #{
description => <<"get retainer config">>,
responses => #{
<<"200">> => schema(mqtt_retainer, <<"Get configs successfully">>),
<<"404">> => error_schema(<<"Config not found">>, ['NOT_FOUND'])
}
},
put => #{
description => <<"Update retainer config">>,
'requestBody' => schema(mqtt_retainer),
responses => #{
<<"200">> => schema(mqtt_retainer, <<"Update configs successfully">>),
<<"400">> => error_schema(<<"Update configs failed">>, ['UPDATE_FAILED'])
}
}
},
{"/mqtt/retainer", MetaData, config}.
lookup_retained_warp(Type, Req) ->
@ -121,7 +129,7 @@ with_topic_warp(Type, Req) ->
check_backend(Type, Req, fun with_topic/2).
config(get, _) ->
Config = emqx_config:get([emqx_retainer]),
Config = emqx:get_config([mqtt_retainer]),
Body = emqx_json:encode(Config),
{200, Body};
@ -129,16 +137,12 @@ config(put, Req) ->
try
{ok, Body, _} = cowboy_req:read_body(Req),
Cfg = emqx_json:decode(Body),
{ok, RawConf} = hocon:binary(jsx:encode(#{<<"emqx_retainer">> => Cfg}),
#{format => richmap}),
RichConf = hocon_schema:check(emqx_retainer_schema, RawConf, #{atom_key => true}),
#{emqx_retainer := Conf} = hocon_schema:richmap_to_map(RichConf),
emqx_retainer:update_config(Conf),
emqx_retainer:update_config(Cfg),
{200, #{<<"content-type">> => <<"text/plain">>}, <<"Update configs successfully">>}
catch _:Reason:_ ->
{400,
#{code => 'UPDATE_FAILED',
message => erlang:list_to_binary(io_lib:format("~p~n", [Reason]))}}
message => iolist_to_binary(io_lib:format("~p~n", [Reason]))}}
end.
%%------------------------------------------------------------------------------
@ -169,30 +173,6 @@ lookup(Topic, Req, Formatter) ->
{200, format_message(Msgs, Formatter)}.
message_schema(Type, Properties) ->
#{Type => #{type => object,
properties => Properties()}}.
message_properties() ->
#{msgid => #{type => string,
description => <<"Message ID">>},
topic => #{type => string,
description => <<"Topic">>},
qos => #{type => integer,
enum => [0, 1, 2],
description => <<"Qos">>},
publish_at => #{type => string,
description => <<"publish datetime">>},
from_clientid => #{type => string,
description => <<"Message from">>},
from_username => #{type => string,
description => <<"publish username">>}}.
detail_message_properties() ->
Base = message_properties(),
Base#{payload => #{type => string,
description => <<"Topic">>}}.
format_message(Messages, Formatter) when is_list(Messages)->
[Formatter(Message) || Message <- Messages];

View File

@ -26,59 +26,38 @@
all() -> emqx_ct:all(?MODULE).
-define(BASE_CONF, <<"""
emqx_retainer {
enable = true
msg_clear_interval = 0s
msg_expiry_interval = 0s
max_payload_size = 1MB
flow_control {
max_read_number = 0
msg_deliver_quota = 0
quota_release_interval = 0s
}
config {
type = built_in_database
storage_type = ram
max_retained_messages = 0
}
}""">>).
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
init_per_suite(Config) ->
application:stop(emqx_retainer),
emqx_ct_helpers:start_apps([emqx_retainer], fun set_special_configs/1),
ok = emqx_config:init_load(emqx_retainer_schema, ?BASE_CONF),
emqx_ct_helpers:start_apps([emqx_retainer]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_retainer]).
init_per_testcase(TestCase, Config) ->
emqx_retainer:clean(),
DefaultCfg = new_emqx_retainer_conf(),
NewCfg = case TestCase of
t_message_expiry_2 ->
DefaultCfg#{msg_expiry_interval := 2000};
t_flow_control ->
DefaultCfg#{flow_control := #{max_read_number => 1,
msg_deliver_quota => 1,
quota_release_interval => timer:seconds(1)}};
_ ->
DefaultCfg
end,
emqx_retainer:update_config(NewCfg),
application:ensure_all_started(emqx_retainer),
Config.
set_special_configs(emqx_retainer) ->
init_emqx_retainer_conf();
set_special_configs(_) ->
ok.
init_emqx_retainer_conf() ->
emqx_config:put([?APP], new_emqx_retainer_conf()).
new_emqx_retainer_conf() ->
#{enable => true,
msg_expiry_interval => 0,
msg_clear_interval => 0,
config => #{type => built_in_database,
max_retained_messages => 0,
storage_type => ram},
flow_control => #{max_read_number => 0,
msg_deliver_quota => 0,
quota_release_interval => 0},
max_payload_size => 1024 * 1024}.
%%--------------------------------------------------------------------
%% Test Cases
%%--------------------------------------------------------------------
t_store_and_clean(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
@ -184,13 +163,14 @@ t_message_expiry(_) ->
ok = emqtt:disconnect(C1).
t_message_expiry_2(_) ->
emqx_retainer:update_config(#{<<"msg_expiry_interval">> => <<"2s">>}),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqtt:publish(C1, <<"retained">>, <<"expire">>, [{qos, 0}, {retain, true}]),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
?assertEqual(1, length(receive_messages(1))),
timer:sleep(3000),
timer:sleep(4000),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
?assertEqual(0, length(receive_messages(1))),
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>),
@ -216,6 +196,9 @@ t_clean(_) ->
ok = emqtt:disconnect(C1).
t_flow_control(_) ->
emqx_retainer:update_config(#{<<"flow_control">> => #{<<"max_read_number">> => 1,
<<"msg_deliver_quota">> => 1,
<<"quota_release_interval">> => <<"1s">>}}),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]),

View File

@ -21,27 +21,38 @@
-include_lib("eunit/include/eunit.hrl").
-define(BASE_CONF, <<"""
emqx_retainer {
enable = true
msg_clear_interval = 0s
msg_expiry_interval = 0s
max_payload_size = 1MB
flow_control {
max_read_number = 0
msg_deliver_quota = 0
quota_release_interval = 0s
}
config {
type = built_in_database
storage_type = ram
max_retained_messages = 0
}
}""">>).
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
ok = emqx_config:init_load(emqx_retainer_schema, ?BASE_CONF),
%% Meck emqtt
ok = meck:new(emqtt, [non_strict, passthrough, no_history, no_link]),
%% Start Apps
emqx_ct_helpers:start_apps([emqx_retainer], fun set_special_configs/1),
emqx_ct_helpers:start_apps([emqx_retainer]),
Config.
end_per_suite(_Config) ->
ok = meck:unload(emqtt),
emqx_ct_helpers:stop_apps([emqx_retainer]).
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
set_special_configs(emqx_retainer) ->
emqx_retainer_SUITE:init_emqx_retainer_conf();
set_special_configs(_) ->
ok.
client_info(Key, Client) ->
maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).

View File

@ -20,7 +20,8 @@
-include("emqx_statsd.hrl").
-import(emqx_mgmt_util, [response_schema/2, request_body_schema/1]).
-import(emqx_mgmt_util, [ schema/1
, bad_request/0]).
-export([api_spec/0]).
@ -37,24 +38,14 @@ statsd_api() ->
Metadata = #{
get => #{
description => <<"Get statsd info">>,
responses => #{
<<"200">> => response_schema(<<>>, statsd)
}
responses => #{<<"200">> => schema(statsd)}
},
put => #{
description => <<"Update Statsd">>,
'requestBody' => request_body_schema(statsd),
'requestBody' => schema(statsd),
responses => #{
<<"200">> =>
response_schema(<<>>, statsd),
<<"400">> =>
response_schema(<<"Bad Request">>, #{
type => object,
properties => #{
message => #{type => string},
code => #{type => string}
}
})
<<"200">> => schema(statsd),
<<"400">> => bad_request()
}
}
},