Merge pull request #6953 from DDDHuang/api_hocon

part1: clients & subscriptions api hocon support
This commit is contained in:
DDDHuang 2022-02-10 14:24:04 +08:00 committed by GitHub
commit 50859bbd83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 352 additions and 441 deletions

View File

@ -38,7 +38,7 @@
-type ip_port() :: tuple().
-type cipher() :: map().
-type rfc3339_system_time() :: integer().
-type qos():: integer().
-type qos() :: integer().
-typerefl_from_string({qos/0, emqx_schema, to_qos}).
-typerefl_from_string({duration/0, emqx_schema, to_duration}).

View File

@ -389,7 +389,8 @@ subscribe(ClientId, TopicTables) ->
subscribe([Node | Nodes], ClientId, TopicTables) ->
case wrap_rpc(emqx_management_proto_v1:subscribe(Node, ClientId, TopicTables)) of
{error, _} -> subscribe(Nodes, ClientId, TopicTables);
Re -> Re
{subscribe, Res} ->
{subscribe, Res, Node}
end;
subscribe([], _ClientId, _TopicTables) ->

View File

@ -18,6 +18,7 @@
-behaviour(minirest_api).
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
@ -25,7 +26,10 @@
-include("emqx_mgmt.hrl").
%% API
-export([api_spec/0]).
-export([ api_spec/0
, paths/0
, schema/1
, fields/1]).
-export([ clients/2
, client/2
@ -67,419 +71,322 @@
<<"{\"code\": \"RESOURCE_NOT_FOUND\", \"reason\": \"Client id not found\"}">>).
api_spec() ->
{apis(), schemas()}.
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
apis() ->
[ clients_api()
, client_api()
, clients_authz_cache_api()
, clients_subscriptions_api()
, subscribe_api()
, unsubscribe_api()
, keepalive_api()
paths() ->
[ "/clients"
, "/clients/:clientid"
, "/clients/:clientid/authz_cache"
, "/clients/:clientid/subscriptions"
, "/clients/:clientid/subscribe"
, "/clients/:clientid/unsubscribe"
, "/clients/:clientid/keepalive"
].
schemas() ->
Client = #{
client => #{
type => object,
properties => emqx_mgmt_util:properties(properties(client))
}
},
AuthzCache = #{
authz_cache => #{
type => object,
properties => emqx_mgmt_util:properties(properties(authz_cache))
}
},
[Client, AuthzCache].
properties(client) ->
[
{awaiting_rel_cnt, integer,
<<"v4 api name [awaiting_rel] Number of awaiting PUBREC packet">>},
{awaiting_rel_max, integer,
<<"v4 api name [max_awaiting_rel]. Maximum allowed number of awaiting PUBREC packet">>},
{clean_start, boolean,
<<"Indicate whether the client is using a brand new session">>},
{clientid, string ,
<<"Client identifier">>},
{connected, boolean,
<<"Whether the client is connected">>},
{connected_at, string ,
<<"Client connection time, rfc3339">>},
{created_at, string ,
<<"Session creation time, rfc3339">>},
{disconnected_at, string ,
<<"Client offline time. It's Only valid and returned when connected is false, rfc3339">>},
{expiry_interval, integer,
<<"Session expiration interval, with the unit of second">>},
{heap_size, integer,
<<"Process heap size with the unit of byte">>},
{inflight_cnt, integer,
<<"Current length of inflight">>},
{inflight_max, integer,
<<"v4 api name [max_inflight]. Maximum length of inflight">>},
{ip_address, string ,
<<"Client's IP address">>},
{port, integer,
<<"Client's port">>},
{is_bridge, boolean,
<<"Indicates whether the client is connectedvia bridge">>},
{keepalive, integer,
<<"keepalive time, with the unit of second">>},
{mailbox_len, integer,
<<"Process mailbox size">>},
{mqueue_dropped, integer,
<<"Number of messages dropped by the message queue due to exceeding the length">>},
{mqueue_len, integer,
<<"Current length of message queue">>},
{mqueue_max, integer,
<<"v4 api name [max_mqueue]. Maximum length of message queue">>},
{node, string ,
<<"Name of the node to which the client is connected">>},
{proto_name, string ,
<<"Client protocol name">>},
{proto_ver, integer,
<<"Protocol version used by the client">>},
{recv_cnt, integer,
<<"Number of TCP packets received">>},
{recv_msg, integer,
<<"Number of PUBLISH packets received">>},
{'recv_msg.qos0', integer,
<<"Number of PUBLISH QoS0 packets received">>},
{'recv_msg.qos1', integer,
<<"Number of PUBLISH QoS1 packets received">>},
{'recv_msg.qos2', integer,
<<"Number of PUBLISH QoS2 packets received">>},
{'recv_msg.dropped', integer,
<<"Number of dropped PUBLISH messages">>},
{'recv_msg.dropped.await_pubrel_timeout', integer,
<<"Number of dropped PUBLISH messages due to waiting PUBREL timeout">>},
{recv_oct, integer,
<<"Number of bytes received by EMQ X Broker (the same below)">>},
{recv_pkt, integer,
<<"Number of MQTT packets received">>},
{reductions, integer,
<<"Erlang reduction">>},
{send_cnt, integer,
<<"Number of TCP packets sent">>},
{send_msg, integer,
<<"Number of PUBLISH messages sent">>},
{'send_msg.qos0', integer,
<<"Number of PUBLISH QoS0 messages sent">>},
{'send_msg.qos1', integer,
<<"Number of PUBLISH QoS1 messages sent">>},
{'send_msg.qos2', integer,
<<"Number of PUBLISH QoS2 messages sent">>},
{'send_msg.dropped', integer,
<<"Number of dropped PUBLISH messages">>},
{'send_msg.dropped.expired', integer,
<<"Number of dropped PUBLISH messages due to expired">>},
{'send_msg.dropped.queue_full', integer,
<<"Number of dropped PUBLISH messages due to queue full">>},
{'send_msg.dropped.too_large', integer,
<<"Number of dropped PUBLISH messages due to packet length too large">>},
{send_oct, integer,
<<"Number of bytes sent">>},
{send_pkt, integer,
<<"Number of MQTT packets sent">>},
{subscriptions_cnt, integer,
<<"Number of subscriptions established by this client.">>},
{subscriptions_max, integer,
<<"v4 api name [max_subscriptions] Maximum number of subscriptions allowed by this client">>},
{username, string ,
<<"User name of client when connecting">>},
{will_msg, string ,
<<"Client will message">>},
{zone, string ,
<<"Indicate the configuration group used by the client">>}
];
properties(authz_cache) ->
[
{access, string, <<"Access type">>},
{result, string, <<"Allow or deny">>},
{topic, string, <<"Topic name">>},
{updated_time, integer, <<"Update time">>}
].
clients_api() ->
Metadata = #{
schema("/clients") ->
#{
'operationId' => clients,
get => #{
description => <<"List clients">>,
parameters => [
#{
name => page,
hoconsc:ref(emqx_dashboard_swagger, page),
hoconsc:ref(emqx_dashboard_swagger, limit),
{node, hoconsc:mk(binary(), #{
in => query,
required => false,
description => <<"Page">>,
schema => #{type => integer}
},
#{
name => limit,
nullable => true,
desc => <<"Node name">>,
example => atom_to_list(node())})},
{username, hoconsc:mk(binary(), #{
in => query,
required => false,
description => <<"Page limit">>,
schema => #{type => integer}
},
#{
name => node,
nullable => true,
desc => <<"User name">>})},
{zone, hoconsc:mk(binary(), #{
in => query,
required => false,
description => <<"Node name">>,
schema => #{type => string}
},
#{
name => username,
nullable => true})},
{ip_address, hoconsc:mk(binary(), #{
in => query,
required => false,
description => <<"User name">>,
schema => #{type => string}
},
#{
name => zone,
nullable => true,
desc => <<"Client's IP address">>,
example => <<"127.0.0.1">>})},
{conn_state, hoconsc:mk(hoconsc:enum([connected, idle, disconnected]), #{
in => query,
required => false,
schema => #{type => string}
},
#{
name => ip_address,
nullable => true,
desc => <<"The current connection status of the client, ",
"the possible values are connected,idle,disconnected">>})},
{clean_start, hoconsc:mk(boolean(), #{
in => query,
required => false,
description => <<"Client's IP address">>,
schema => #{type => string}
},
#{
name => conn_state,
nullable => true,
description => <<"Whether the client uses a new session">>})},
{proto_name, hoconsc:mk(hoconsc:enum(['MQTT', 'CoAP', 'LwM2M', 'MQTT-SN']), #{
in => query,
required => false,
description =>
<<"The current connection status of the client, ",
"the possible values are connected,idle,disconnected">>,
schema => #{type => string, enum => [connected, idle, disconnected]}
},
#{
name => clean_start,
nullable => true,
description => <<"Client protocol name, ",
"the possible values are MQTT,CoAP,LwM2M,MQTT-SN">>})},
{proto_ver, hoconsc:mk(binary(), #{
in => query,
required => false,
description => <<"Whether the client uses a new session">>,
schema => #{type => boolean}
},
#{
name => proto_name,
nullable => true,
desc => <<"Client protocol version">>})},
{like_clientid, hoconsc:mk(binary(), #{
in => query,
required => false,
description =>
<<"Client protocol name, ",
"the possible values are MQTT,CoAP,LwM2M,MQTT-SN">>,
schema => #{type => string, enum => ['MQTT', 'CoAP', 'LwM2M', 'MQTT-SN']}
},
#{
name => proto_ver,
nullable => true,
desc => <<"Fuzzy search of client identifier by substring method">>})},
{like_username, hoconsc:mk(binary(), #{
in => query,
required => false,
description => <<"Client protocol version">>,
schema => #{type => string}
},
#{
name => like_clientid,
nullable => true,
desc => <<"Client user name, fuzzy search by substring">>})},
{gte_created_at, hoconsc:mk(binary(), #{
in => query,
required => false,
description => <<"Fuzzy search of client identifier by substring method">>,
schema => #{type => string}
},
#{
name => like_username,
nullable => true,
desc => <<"Search client session creation time by greater",
" than or equal method, rfc3339 or timestamp(millisecond)">>})},
{lte_created_at, hoconsc:mk(binary(), #{
in => query,
required => false,
description => <<"Client user name, fuzzy search by substring">>,
schema => #{type => string}
},
#{
name => gte_created_at,
nullable => true,
desc => <<"Search client session creation time by less",
" than or equal method, rfc3339 or timestamp(millisecond)">>})},
{gte_connected_at, hoconsc:mk(binary(), #{
in => query,
required => false,
description =>
<<"Search client session creation time by greater than or equal method, "
"rfc3339 or timestamp(millisecond)">>,
schema => #{type => string}
},
#{
name => lte_created_at,
nullable => true,
desc => <<"Search client connection creation time by greater"
" than or equal method, rfc3339 or timestamp(millisecond)">>})},
{lte_connected_at, hoconsc:mk(binary(), #{
in => query,
required => false,
description =>
<<"Search client session creation time by less than or equal method, ",
"rfc3339 or timestamp(millisecond)">>,
schema => #{type => string}
},
#{
name => gte_connected_at,
in => query,
required => false,
description =>
<<"Search client connection creation time by greater than or equal method, ",
"rfc3339 or timestamp(millisecond)">>,
schema => #{type => string}
},
#{
name => lte_connected_at,
in => query,
required => false,
description =>
<<"Search client connection creation time by less than or equal method, ",
"rfc3339 or timestamp(millisecond) ">>,
schema => #{type => string}
}
nullable => true,
desc => <<"Search client connection creation time by less"
" than or equal method, rfc3339 or timestamp(millisecond)">>})}
],
responses => #{
<<"200">> => emqx_mgmt_util:array_schema(client, <<"List clients 200 OK">>),
<<"400">> => emqx_mgmt_util:error_schema( <<"Invalid parameters">>
, ['INVALID_PARAMETER'])}}},
{"/clients", Metadata, clients}.
200 => [
{data, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, client)), #{})},
{meta, hoconsc:mk(hoconsc:ref(?MODULE, meta), #{})}
],
400 =>
emqx_dashboard_swagger:error_codes(
['INVALID_PARAMETER'], <<"Invalid parameters">>)}
}
};
client_api() ->
Metadata = #{
schema("/clients/:clientid") ->
#{
'operationId' => client,
get => #{
description => <<"Get clients info by client ID">>,
parameters => [#{
name => clientid,
in => path,
schema => #{type => string},
required => true
}],
parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
responses => #{
<<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:schema(client, <<"List clients 200 OK">>)}},
200 => hoconsc:mk(hoconsc:ref(?MODULE, client), #{}),
404 => emqx_dashboard_swagger:error_codes(
['CLIENTID_NOT_FOUND'], <<"Client id not found">>)}},
delete => #{
description => <<"Kick out client by client ID">>,
parameters => [#{
name => clientid,
in => path,
schema => #{type => string},
required => true
}],
parameters => [
{clientid, hoconsc:mk(binary(), #{in => path})}],
responses => #{
<<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
<<"204">> => emqx_mgmt_util:schema(<<"Kick out client successfully">>)}}},
{"/clients/:clientid", Metadata, client}.
204 => <<"Kick out client successfully">>,
404 => emqx_dashboard_swagger:error_codes(
['CLIENTID_NOT_FOUND'], <<"Client id not found">>)
}
}
};
clients_authz_cache_api() ->
Metadata = #{
schema("/clients/:clientid/authz_cache") ->
#{
'operationId' => authz_cache,
get => #{
description => <<"Get client authz cache">>,
parameters => [#{
name => clientid,
in => path,
schema => #{type => string},
required => true
}],
parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
responses => #{
<<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:schema(authz_cache, <<"Get client authz cache">>)}},
200 => hoconsc:mk(hoconsc:ref(?MODULE, authz_cache), #{}),
404 => emqx_dashboard_swagger:error_codes(
['CLIENTID_NOT_FOUND'], <<"Client id not found">>)
}
},
delete => #{
description => <<"Clean client authz cache">>,
parameters => [#{
name => clientid,
in => path,
schema => #{type => string},
required => true
}],
parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
responses => #{
<<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
<<"204">> => emqx_mgmt_util:schema(<<"Clean client authz cache successfully">>)}}},
{"/clients/:clientid/authz_cache", Metadata, authz_cache}.
204 => <<"Kick out client successfully">>,
404 => emqx_dashboard_swagger:error_codes(
['CLIENTID_NOT_FOUND'], <<"Client id not found">>)
}
}
};
clients_subscriptions_api() ->
Metadata = #{
schema("/clients/:clientid/subscriptions") ->
#{
'operationId' => subscriptions,
get => #{
description => <<"Get client subscriptions">>,
parameters => [#{
name => clientid,
in => path,
schema => #{type => string},
required => true
}],
parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
responses => #{
<<"200">> =>
emqx_mgmt_util:array_schema(subscription, <<"Get client subscriptions">>)}}
},
{"/clients/:clientid/subscriptions", Metadata, subscriptions}.
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(emqx_mgmt_api_subscriptions, subscription)), #{}),
404 => emqx_dashboard_swagger:error_codes(
['CLIENTID_NOT_FOUND'], <<"Client id not found">>)
}
}
};
unsubscribe_api() ->
Metadata = #{
post => #{
description => <<"Unsubscribe">>,
parameters => [
#{
name => clientid,
in => path,
schema => #{type => string},
required => true
}
],
'requestBody' => emqx_mgmt_util:schema(#{
type => object,
properties => #{
topic => #{
type => string,
description => <<"Topic">>}}}),
responses => #{
<<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:schema(<<"Unsubscribe ok">>)}}},
{"/clients/:clientid/unsubscribe", Metadata, unsubscribe}.
subscribe_api() ->
Metadata = #{
schema("/clients/:clientid/subscribe") ->
#{
'operationId' => subscribe,
post => #{
description => <<"Subscribe">>,
parameters => [#{
name => clientid,
in => path,
schema => #{type => string},
required => true
}],
'requestBody' => emqx_mgmt_util:schema(#{
type => object,
properties => #{
topic => #{
type => string,
description => <<"Topic">>},
qos => #{
type => integer,
enum => [0, 1, 2],
example => 0,
description => <<"QoS">>}}}),
parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, subscribe)),
responses => #{
<<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:schema(<<"Subscribe ok">>)}}},
{"/clients/:clientid/subscribe", Metadata, subscribe}.
200 => hoconsc:ref(emqx_mgmt_api_subscriptions, subscription),
404 => emqx_dashboard_swagger:error_codes(
['CLIENTID_NOT_FOUND'], <<"Client id not found">>)
}
}
};
keepalive_api() ->
Metadata = #{
put => #{
description => <<"set the online client keepalive by second ">>,
parameters => [#{
name => clientid,
in => path,
schema => #{type => string},
required => true
},
#{
name => interval,
in => query,
schema => #{type => integer},
required => true
}
],
schema("/clients/:clientid/unsubscribe") ->
#{
'operationId' => unsubscribe,
post => #{
description => <<"Unsubscribe">>,
parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, unsubscribe)),
responses => #{
<<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
<<"400">> => emqx_mgmt_util:error_schema(<<"">>, ['PARAMS_ERROR']),
<<"200">> => emqx_mgmt_util:schema(<<"ok">>)}}},
{"/clients/:clientid/keepalive", Metadata, set_keepalive}.
204 => <<"Unsubscribe OK">>,
404 => emqx_dashboard_swagger:error_codes(
['CLIENTID_NOT_FOUND'], <<"Client id not found">>)
}
}
};
schema("/clients/:clientid/keepalive") ->
#{
'operationId' => set_keepalive,
put => #{
description => <<"Set the online client keepalive by seconds">>,
parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, keepalive)),
responses => #{
200 => hoconsc:mk(hoconsc:ref(?MODULE, client), #{}),
404 => emqx_dashboard_swagger:error_codes(
['CLIENTID_NOT_FOUND'], <<"Client id not found">>)
}
}
}.
fields(client) ->
[
{awaiting_rel_cnt, hoconsc:mk(integer(), #{desc =>
<<"v4 api name [awaiting_rel] Number of awaiting PUBREC packet">>})},
{awaiting_rel_max, hoconsc:mk(integer(), #{desc =>
<<"v4 api name [max_awaiting_rel]. "
"Maximum allowed number of awaiting PUBREC packet">>})},
{clean_start, hoconsc:mk(boolean(), #{desc =>
<<"Indicate whether the client is using a brand new session">>})},
{clientid, hoconsc:mk(binary(), #{desc => <<"Client identifier">>})},
{connected, hoconsc:mk(boolean(), #{desc => <<"Whether the client is connected">>})},
{connected_at, hoconsc:mk(binary(), #{desc => <<"Client connection time, rfc3339">>})},
{created_at, hoconsc:mk(binary(), #{desc => <<"Session creation time, rfc3339">>})},
{disconnected_at, hoconsc:mk(binary(), #{desc =>
<<"Client offline time."
" It's Only valid and returned when connected is false, rfc3339">>})},
{expiry_interval, hoconsc:mk(integer(), #{desc =>
<<"Session expiration interval, with the unit of second">>})},
{heap_size, hoconsc:mk(integer(), #{desc =>
<<"Process heap size with the unit of byte">>})},
{inflight_cnt, hoconsc:mk(integer(), #{desc => <<"Current length of inflight">>})},
{inflight_max, hoconsc:mk(integer(), #{desc =>
<<"v4 api name [max_inflight]. Maximum length of inflight">>})},
{ip_address, hoconsc:mk(binary(), #{desc => <<"Client's IP address">>})},
{is_bridge, hoconsc:mk(boolean(), #{desc =>
<<"Indicates whether the client is connectedvia bridge">>})},
{keepalive, hoconsc:mk(integer(), #{desc =>
<<"keepalive time, with the unit of second">>})},
{mailbox_len, hoconsc:mk(integer(), #{desc => <<"Process mailbox size">>})},
{mqueue_dropped, hoconsc:mk(integer(), #{desc =>
<<"Number of messages dropped by the message queue due to exceeding the length">>})},
{mqueue_len, hoconsc:mk(integer(), #{desc => <<"Current length of message queue">>})},
{mqueue_max, hoconsc:mk(integer(), #{desc =>
<<"v4 api name [max_mqueue]. Maximum length of message queue">>})},
{node, hoconsc:mk(binary(), #{desc =>
<<"Name of the node to which the client is connected">>})},
{port, hoconsc:mk(integer(), #{desc => <<"Client's port">>})},
{proto_name, hoconsc:mk(binary(), #{desc => <<"Client protocol name">>})},
{proto_ver, hoconsc:mk(integer(), #{desc => <<"Protocol version used by the client">>})},
{recv_cnt, hoconsc:mk(integer(), #{desc => <<"Number of TCP packets received">>})},
{recv_msg, hoconsc:mk(integer(), #{desc => <<"Number of PUBLISH packets received">>})},
{'recv_msg.dropped', hoconsc:mk(integer(), #{desc =>
<<"Number of dropped PUBLISH packets">>})},
{'recv_msg.dropped.await_pubrel_timeout', hoconsc:mk(integer(), #{desc =>
<<"Number of dropped PUBLISH packets due to expired">>})},
{'recv_msg.qos0', hoconsc:mk(integer(), #{desc =>
<<"Number of PUBLISH QoS0 packets received">>})},
{'recv_msg.qos1', hoconsc:mk(integer(), #{desc =>
<<"Number of PUBLISH QoS1 packets received">>})},
{'recv_msg.qos2', hoconsc:mk(integer(), #{desc =>
<<"Number of PUBLISH QoS2 packets received">>})},
{recv_oct, hoconsc:mk(integer(), #{desc =>
<<"Number of bytes received by EMQ X Broker (the same below)">>})},
{recv_pkt, hoconsc:mk(integer(), #{desc => <<"Number of MQTT packets received">>})},
{reductions, hoconsc:mk(integer(), #{desc => <<"Erlang reduction">>})},
{send_cnt, hoconsc:mk(integer(), #{desc => <<"Number of TCP packets sent">>})},
{send_msg, hoconsc:mk(integer(), #{desc => <<"Number of PUBLISH packets sent">>})},
{'send_msg.dropped', hoconsc:mk(integer(), #{desc =>
<<"Number of dropped PUBLISH packets">>})},
{'send_msg.dropped.expired', hoconsc:mk(integer(), #{desc =>
<<"Number of dropped PUBLISH packets due to expired">>})},
{'send_msg.dropped.queue_full', hoconsc:mk(integer(), #{desc =>
<<"Number of dropped PUBLISH packets due to queue full">>})},
{'send_msg.dropped.too_large', hoconsc:mk(integer(), #{desc =>
<<"Number of dropped PUBLISH packets due to packet length too large">>})},
{'send_msg.qos0', hoconsc:mk(integer(), #{desc =>
<<"Number of PUBLISH QoS0 packets sent">>})},
{'send_msg.qos1', hoconsc:mk(integer(), #{desc =>
<<"Number of PUBLISH QoS1 packets sent">>})},
{'send_msg.qos2', hoconsc:mk(integer(), #{desc =>
<<"Number of PUBLISH QoS2 packets sent">>})},
{send_oct, hoconsc:mk(integer(), #{desc => <<"Number of bytes sent">>})},
{send_pkt, hoconsc:mk(integer(), #{desc => <<"Number of MQTT packets sent">>})},
{subscriptions_cnt, hoconsc:mk(integer(), #{desc =>
<<"Number of subscriptions established by this client.">>})},
{subscriptions_max, hoconsc:mk(integer(), #{desc =>
<<"v4 api name [max_subscriptions]",
" Maximum number of subscriptions allowed by this client">>})},
{username, hoconsc:mk(binary(), #{desc => <<"User name of client when connecting">>})},
{will_msg, hoconsc:mk(binary(), #{desc => <<"Client will message">>})},
{zone, hoconsc:mk(binary(), #{desc =>
<<"Indicate the configuration group used by the client">>})}
];
fields(authz_cache) ->
[
{access, hoconsc:mk(binary(), #{desc => <<"Access type">>})},
{result, hoconsc:mk(binary(), #{desc => <<"Allow or deny">>})},
{topic, hoconsc:mk(binary(), #{desc => <<"Topic name">>})},
{updated_time, hoconsc:mk(integer(), #{desc => <<"Update time">>})}
];
fields(keepalive) ->
[
{interval, hoconsc:mk(integer(), #{desc => <<"Keepalive time, with the unit of second">>})}
];
fields(subscribe) ->
[
{topic, hoconsc:mk(binary(), #{desc => <<"Access type">>})},
{qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})}
];
fields(unsubscribe) ->
[
{topic, hoconsc:mk(binary(), #{desc => <<"Access type">>})}
];
fields(meta) ->
emqx_dashboard_swagger:fields(page) ++
emqx_dashboard_swagger:fields(limit) ++
[{count, hoconsc:mk(integer(), #{example => 1})}].
%%%==============================================================================================
%% parameters trans
clients(get, #{query_string := Qs}) ->
list(emqx_mgmt_api:ensure_timestamp_format(Qs, time_keys())).
list_clients(emqx_mgmt_api:ensure_timestamp_format(Qs, time_keys())).
client(get, #{bindings := Bindings}) ->
lookup(Bindings);
@ -529,13 +436,12 @@ subscriptions(get, #{bindings := #{clientid := ClientID}}) ->
{200, lists:map(Formatter, Subs)}
end.
set_keepalive(put, #{bindings := #{clientid := ClientID}, query_string := Query}) ->
case maps:find(<<"interval">>, Query) of
error -> {404, "Interval Not Found"};
{ok, Interval0} ->
Interval = binary_to_integer(Interval0),
set_keepalive(put, #{bindings := #{clientid := ClientID}, body := Body}) ->
case maps:find(<<"interval">>, Body) of
error -> {400, 'BAD_REQUEST',"Interval Not Found"};
{ok, Interval} ->
case emqx_mgmt:set_keepalive(emqx_mgmt_util:urldecode(ClientID), Interval) of
ok -> {200};
ok -> lookup(#{clientid => ClientID});
{error, not_found} ->{404, ?CLIENT_ID_NOT_FOUND};
{error, Reason} -> {400, #{code => 'PARAMS_ERROR', message => Reason}}
end
@ -544,7 +450,7 @@ set_keepalive(put, #{bindings := #{clientid := ClientID}, query_string := Query}
%%%==============================================================================================
%% api apply
list(Params) ->
list_clients(Params) ->
{Tab, QuerySchema} = ?CLIENT_QS_SCHEMA,
case maps:get(<<"node">>, Params, undefined) of
undefined ->
@ -590,7 +496,7 @@ get_authz_cache(#{clientid := ClientID})->
clean_authz_cache(#{clientid := ClientID}) ->
case emqx_mgmt:clean_authz_cache(ClientID) of
ok ->
{200};
{204};
{error, not_found} ->
{404, ?CLIENT_ID_NOT_FOUND};
{error, Reason} ->
@ -605,8 +511,15 @@ subscribe(#{clientid := ClientID, topic := Topic, qos := Qos}) ->
{error, Reason} ->
Message = list_to_binary(io_lib:format("~p", [Reason])),
{500, #{code => <<"UNKNOW_ERROR">>, message => Message}};
ok ->
{200}
{ok, Node} ->
Response =
#{
clientid => ClientID,
topic => Topic,
qos => Qos,
node => Node
},
{200, Response}
end.
unsubscribe(#{clientid := ClientID, topic := Topic}) ->
@ -614,7 +527,7 @@ unsubscribe(#{clientid := ClientID, topic := Topic}) ->
{error, channel_not_found} ->
{404, ?CLIENT_ID_NOT_FOUND};
{unsubscribe, [{Topic, #{}}]} ->
{200}
{204}
end.
subscribe_batch(#{clientid := ClientID, topics := Topics}) ->
@ -630,10 +543,10 @@ do_subscribe(ClientID, Topic0, Qos) ->
case emqx_mgmt:subscribe(ClientID, TopicTable) of
{error, Reason} ->
{error, Reason};
{subscribe, Subscriptions} ->
{subscribe, Subscriptions, Node} ->
case proplists:is_defined(Topic, Subscriptions) of
true ->
ok;
{ok, Node};
false ->
{error, unknow_error}
end

View File

@ -18,15 +18,14 @@
-behaviour(minirest_api).
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-import(emqx_mgmt_util, [ page_schema/1
, error_schema/2
, properties/1
, page_params/0
]).
-export([api_spec/0]).
-export([ api_spec/0
, paths/0
, schema/1
, fields/1]).
-export([subscriptions/2]).
@ -46,73 +45,73 @@
-define(format_fun, {?MODULE, format}).
api_spec() ->
{subscriptions_api(), subscription_schema()}.
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
subscriptions_api() ->
MetaData = #{
paths() ->
["/subscriptions"].
schema("/subscriptions") ->
#{
'operationId' => subscriptions,
get => #{
description => <<"List subscriptions">>,
parameters => parameters(),
responses => #{
<<"200">> => page_schema(subscription),
<<"400">> => error_schema(<<"Invalid parameters">>, ['INVALID_PARAMETER'])
}
}
},
[{"/subscriptions", MetaData, subscriptions}].
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, subscription)), #{})}}
}.
subscription_schema() ->
Props = properties([
{node, string},
{topic, string},
{clientid, string},
{qos, integer, <<>>, [0,1,2]}]),
[#{subscription => #{type => object, properties => Props}}].
fields(subscription) ->
[
{node, hoconsc:mk(binary(), #{desc => <<"Access type">>})},
{topic, hoconsc:mk(binary(), #{desc => <<"Topic name">>})},
{clientid, hoconsc:mk(binary(), #{desc => <<"Client identifier">>})},
{qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})}
].
parameters() ->
[
#{
name => clientid,
hoconsc:ref(emqx_dashboard_swagger, page),
hoconsc:ref(emqx_dashboard_swagger, limit),
{
node, hoconsc:mk(binary(), #{
in => query,
description => <<"Client ID">>,
schema => #{type => string}
nullable => true,
desc => <<"Node name">>,
example => atom_to_list(node())})
},
#{
name => node,
{
clientid, hoconsc:mk(binary(), #{
in => query,
description => <<"Node name">>,
schema => #{type => string}
nullable => true,
desc => <<"Client ID">>})
},
#{
name => qos,
{
qos, hoconsc:mk(emqx_schema:qos(), #{
in => query,
description => <<"QoS">>,
schema => #{type => integer, enum => [0, 1, 2]}
nullable => true,
desc => <<"QoS">>})
},
#{
name => share_group,
{
topic, hoconsc:mk(binary(), #{
in => query,
description => <<"Shared subscription group name">>,
schema => #{type => string}
nullable => true,
desc => <<"Topic, url encoding">>})
},
#{
name => topic,
{
match_topic, hoconsc:mk(binary(), #{
in => query,
description => <<"Topic, url encoding">>,
schema => #{type => string}
nullable => true,
desc => <<"Match topic string, url encoding">>})
},
{
share_group, hoconsc:mk(binary(), #{
in => query,
nullable => true,
desc => <<"Shared subscription group name">>})
}
#{
name => match_topic,
in => query,
description => <<"Match topic string, url encoding">>,
schema => #{type => string}
} | page_params()
].
subscriptions(get, #{query_string := Params}) ->
list(Params).
list(Params) ->
{Tab, QuerySchema} = ?SUBS_QS_SCHEMA,
case maps:get(<<"node">>, Params, undefined) of
undefined ->

View File

@ -96,8 +96,9 @@ t_clients(_) ->
%% post /clients/:clientid/unsubscribe
UnSubscribePath = emqx_mgmt_api_test_util:api_path(["clients",
binary_to_list(ClientId1), "unsubscribe"]),
UnSubscribeBody = #{topic => Topic},
{ok, _} = emqx_mgmt_api_test_util:request_api(post, UnSubscribePath,
"", AuthHeader, SubscribeBody),
"", AuthHeader, UnSubscribeBody),
timer:sleep(100),
?assertEqual([], emqx_mgmt:lookup_subscriptions(Client1)),
@ -165,15 +166,15 @@ t_query_clients_with_time(_) ->
t_keepalive(_Config) ->
Username = "user_keepalive",
ClientId = "client_keepalive",
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "keepalive"]),
Query = "interval=11",
Body = #{interval => 11},
{error,{"HTTP/1.1",404,"Not Found"}} =
emqx_mgmt_api_test_util:request_api(put, Path, Query, AuthHeader, <<"">>),
emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body),
{ok, C1} = emqtt:start_link(#{username => Username, clientid => ClientId}),
{ok, _} = emqtt:connect(C1),
{ok, Ok} = emqx_mgmt_api_test_util:request_api(put, Path, Query, AuthHeader, <<"">>),
?assertEqual("", Ok),
{ok, NewClient} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body),
#{<<"keepalive">> := 11} = emqx_json:decode(NewClient, [return_maps]),
[Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)),
#{conninfo := #{keepalive := Keepalive}} = emqx_connection:info(Pid),
?assertEqual(11, Keepalive),

View File

@ -31,10 +31,8 @@
, stats/2
]).
-define(API_TAG_PROMETHEUS, [<<"premetheus">>]).
-define(SCHEMA_MODULE, emqx_prometheus_schema).
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
@ -47,7 +45,6 @@ schema("/prometheus") ->
#{ 'operationId' => prometheus
, get =>
#{ description => <<"Get Prometheus config info">>
, tags => ?API_TAG_PROMETHEUS
, responses =>
#{200 => prometheus_config_schema()}
}