refactor: improve the _api_clieints implement

This commit is contained in:
JianBo He 2021-11-29 18:07:30 +08:00
parent 01c50992e9
commit f3f3f12635
5 changed files with 479 additions and 299 deletions

View File

@ -70,9 +70,9 @@ gateway.stomp {
## SSL options ## SSL options
## See ${example_common_ssl_options} for more information ## See ${example_common_ssl_options} for more information
ssl.versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"] ssl.versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"]
ssl.keyfile = "{{ platform_etc_dir }}/certs/key.pem"
ssl.certfile = "{{ platform_etc_dir }}/certs/cert.pem"
ssl.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" ssl.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
ssl.certfile = "{{ platform_etc_dir }}/certs/cert.pem"
ssl.keyfile = "{{ platform_etc_dir }}/certs/key.pem"
#ssl.verify = verify_none #ssl.verify = verify_none
#ssl.fail_if_no_peer_cert = false #ssl.fail_if_no_peer_cert = false
#ssl.server_name_indication = disable #ssl.server_name_indication = disable

View File

@ -318,20 +318,26 @@ listeners_schema(?R_REF(_Mod, udp_tcp_listeners)) ->
%% examples %% examples
examples_gateway_confs() -> examples_gateway_confs() ->
#{ a_stomp_gateway => #{ stomp_gateway =>
#{ enable => true #{ summary => <<"A simple STOMP gateway configs">>
, enable_stats => true , value =>
, idle_timeout => <<"30s">> #{ enable => true
, mountpoint => <<"stomp/">> , enable_stats => true
, frame => , idle_timeout => <<"30s">>
#{ max_header => 10 , mountpoint => <<"stomp/">>
, make_header_length => 1024 , frame =>
, max_body_length => 65535 #{ max_header => 10
, make_header_length => 1024
, max_body_length => 65535
}
} }
} }
, a_mqttsn_gateway => , mqttsn_gateway =>
#{ enable => true #{ summary => <<"A simple MQTT-SN gateway configs">>
, enable_stats => true , value =>
#{ enable => true
, enable_stats => true
}
} }
}. }.

View File

@ -16,12 +16,30 @@
-module(emqx_gateway_api_clients). -module(emqx_gateway_api_clients).
-behaviour(minirest_api). -include("emqx_gateway_http.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/emqx_placeholder.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
%% minirest behaviour callbacks -behaviour(minirest_api).
-export([api_spec/0]).
-import(hoconsc, [mk/2, ref/1, ref/2]).
-import(emqx_gateway_http,
[ return_http_error/2
, with_gateway/2
]).
%% minirest/dashbaord_swagger behaviour callbacks
-export([ api_spec/0
, paths/0
, schema/1
]).
-export([ roots/0
, fields/1
]).
%% http handlers %% http handlers
-export([ clients/2 -export([ clients/2
@ -34,27 +52,18 @@
, format_channel_info/1 , format_channel_info/1
]). ]).
-import(emqx_gateway_http,
[ return_http_error/2
, with_gateway/2
, schema_bad_request/0
, schema_not_found/0
, schema_internal_error/0
, schema_no_content/0
]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
api_spec() -> api_spec() ->
{metadata(apis()), []}. emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
apis() -> paths() ->
[ {"/gateway/:name/clients", clients} [ "/gateway/:name/clients"
, {"/gateway/:name/clients/:clientid", clients_insta} , "/gateway/:name/clients/:clientid"
, {"/gateway/:name/clients/:clientid/subscriptions", subscriptions} , "/gateway/:name/clients/:clientid/subscriptions"
, {"/gateway/:name/clients/:clientid/subscriptions/:topic", subscriptions} , "/gateway/:name/clients/:clientid/subscriptions/:topic"
]. ].
-define(CLIENT_QS_SCHEMA, -define(CLIENT_QS_SCHEMA,
@ -88,14 +97,16 @@ clients(get, #{ bindings := #{name := Name0}
TabName = emqx_gateway_cm:tabname(info, GwName), TabName = emqx_gateway_cm:tabname(info, GwName),
case maps:get(<<"node">>, Params, undefined) of case maps:get(<<"node">>, Params, undefined) of
undefined -> undefined ->
Response = emqx_mgmt_api:cluster_query(Params, TabName, Response = emqx_mgmt_api:cluster_query(
?CLIENT_QS_SCHEMA, ?query_fun), Params, TabName,
?CLIENT_QS_SCHEMA, ?query_fun),
emqx_mgmt_util:generate_response(Response); emqx_mgmt_util:generate_response(Response);
Node1 -> Node1 ->
Node = binary_to_atom(Node1, utf8), Node = binary_to_atom(Node1, utf8),
ParamsWithoutNode = maps:without([<<"node">>], Params), ParamsWithoutNode = maps:without([<<"node">>], Params),
Response = emqx_mgmt_api:node_query(Node, ParamsWithoutNode, Response = emqx_mgmt_api:node_query(
TabName, ?CLIENT_QS_SCHEMA, ?query_fun), Node, ParamsWithoutNode,
TabName, ?CLIENT_QS_SCHEMA, ?query_fun),
emqx_mgmt_util:generate_response(Response) emqx_mgmt_util:generate_response(Response)
end end
end). end).
@ -105,8 +116,9 @@ clients_insta(get, #{ bindings := #{name := Name0,
}) -> }) ->
ClientId = emqx_mgmt_util:urldecode(ClientId0), ClientId = emqx_mgmt_util:urldecode(ClientId0),
with_gateway(Name0, fun(GwName, _) -> with_gateway(Name0, fun(GwName, _) ->
case emqx_gateway_http:lookup_client(GwName, ClientId, case emqx_gateway_http:lookup_client(
{?MODULE, format_channel_info}) of GwName, ClientId,
{?MODULE, format_channel_info}) of
[ClientInfo] -> [ClientInfo] ->
{200, ClientInfo}; {200, ClientInfo};
[ClientInfo | _More] -> [ClientInfo | _More] ->
@ -154,7 +166,8 @@ subscriptions(post, #{ bindings := #{name := Name0,
{undefined, _} -> {undefined, _} ->
return_http_error(400, "Miss topic property"); return_http_error(400, "Miss topic property");
{Topic, QoS} -> {Topic, QoS} ->
case emqx_gateway_http:client_subscribe(GwName, ClientId, Topic, QoS) of case emqx_gateway_http:client_subscribe(
GwName, ClientId, Topic, QoS) of
{error, Reason} -> {error, Reason} ->
return_http_error(404, Reason); return_http_error(404, Reason);
ok -> ok ->
@ -204,8 +217,9 @@ query(Tab, {Qs, []}, Continuation, Limit) ->
query(Tab, {Qs, Fuzzy}, Continuation, Limit) -> query(Tab, {Qs, Fuzzy}, Continuation, Limit) ->
Ms = qs2ms(Qs), Ms = qs2ms(Qs),
FuzzyFilterFun = fuzzy_filter_fun(Fuzzy), FuzzyFilterFun = fuzzy_filter_fun(Fuzzy),
emqx_mgmt_api:select_table_with_count(Tab, {Ms, FuzzyFilterFun}, Continuation, Limit, emqx_mgmt_api:select_table_with_count(
fun format_channel_info/1). Tab, {Ms, FuzzyFilterFun}, Continuation, Limit,
fun format_channel_info/1).
qs2ms(Qs) -> qs2ms(Qs) ->
{MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}), {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}),
@ -218,8 +232,10 @@ qs2ms([{Key, '=:=', Value} | Rest], N, {MtchHead, Conds}) ->
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Value)), NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Value)),
qs2ms(Rest, N, {NMtchHead, Conds}); qs2ms(Rest, N, {NMtchHead, Conds});
qs2ms([Qs | Rest], N, {MtchHead, Conds}) -> qs2ms([Qs | Rest], N, {MtchHead, Conds}) ->
Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8), Holder = binary_to_atom(
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)), iolist_to_binary(["$", integer_to_list(N)]), utf8),
NMtchHead = emqx_mgmt_util:merge_maps(
MtchHead, ms(element(1, Qs), Holder)),
NConds = put_conds(Qs, Holder, Conds), NConds = put_conds(Qs, Holder, Conds),
qs2ms(Rest, N+1, {NMtchHead, NConds}). qs2ms(Rest, N+1, {NMtchHead, NConds}).
@ -271,12 +287,14 @@ escape(B) when is_binary(B) ->
run_fuzzy_filter(_, []) -> run_fuzzy_filter(_, []) ->
true; true;
run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, _, RE} | Fuzzy]) -> run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _},
[{Key, _, RE} | Fuzzy]) ->
Val = case maps:get(Key, ClientInfo, "") of Val = case maps:get(Key, ClientInfo, "") of
undefined -> ""; undefined -> "";
V -> V V -> V
end, end,
re:run(Val, RE, [{capture, none}]) == match andalso run_fuzzy_filter(E, Fuzzy). re:run(Val, RE, [{capture, none}]) == match
andalso run_fuzzy_filter(E, Fuzzy).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% format funcs %% format funcs
@ -294,15 +312,19 @@ format_channel_info({_, Infos, Stats} = R) ->
, {port, {peername, ConnInfo, fun peer_to_port/1}} , {port, {peername, ConnInfo, fun peer_to_port/1}}
, {is_bridge, ClientInfo, false} , {is_bridge, ClientInfo, false}
, {connected_at, , {connected_at,
{connected_at, ConnInfo, fun emqx_gateway_utils:unix_ts_to_rfc3339/1}} {connected_at, ConnInfo,
fun emqx_gateway_utils:unix_ts_to_rfc3339/1}}
, {disconnected_at, , {disconnected_at,
{disconnected_at, ConnInfo, fun emqx_gateway_utils:unix_ts_to_rfc3339/1}} {disconnected_at, ConnInfo,
, {connected, {conn_state, Infos, fun conn_state_to_connected/1}} fun emqx_gateway_utils:unix_ts_to_rfc3339/1}}
, {connected, {conn_state, Infos,
fun conn_state_to_connected/1}}
, {keepalive, ClientInfo, 0} , {keepalive, ClientInfo, 0}
, {clean_start, ConnInfo, true} , {clean_start, ConnInfo, true}
, {expiry_interval, ConnInfo, 0} , {expiry_interval, ConnInfo, 0}
, {created_at, , {created_at,
{created_at, SessInfo, fun emqx_gateway_utils:unix_ts_to_rfc3339/1}} {created_at, SessInfo,
fun emqx_gateway_utils:unix_ts_to_rfc3339/1}}
, {subscriptions_cnt, Stats, 0} , {subscriptions_cnt, Stats, 0}
, {subscriptions_max, Stats, infinity} , {subscriptions_max, Stats, infinity}
, {inflight_cnt, Stats, 0} , {inflight_cnt, Stats, 0}
@ -384,275 +406,338 @@ conn_state_to_connected(_) -> false.
%% Swagger defines %% Swagger defines
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
metadata(APIs) -> schema("/gateway/:name/clients") ->
metadata(APIs, []). #{ 'operationId' => clients
metadata([], APIAcc) -> , get =>
lists:reverse(APIAcc); #{ description => <<"Get the gateway client list">>
metadata([{Path, Fun} | More], APIAcc) -> , parameters => params_client_query()
Methods = [get, post, put, delete, patch], , responses =>
Mds = lists:foldl(fun(M, Acc) -> ?STANDARD_RESP(
try #{ 200 => emqx_dashboard_swagger:schema_with_examples(
Acc#{M => swagger(Path, M)} hoconsc:array(ref(client)),
catch examples_client_list())})
error : function_clause -> }
Acc
end
end, #{}, Methods),
metadata(More, [{Path, Mds, Fun} | APIAcc]).
swagger("/gateway/:name/clients", get) ->
#{ description => <<"Get the gateway clients">>
, parameters => params_client_query()
, responses =>
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_clients_list()
}
}; };
swagger("/gateway/:name/clients/:clientid", get) -> schema("/gateway/:name/clients/:clientid") ->
#{ description => <<"Get the gateway client infomation">> #{ 'operationId' => clients_insta
, parameters => params_client_insta() , get =>
, responses => #{ description => <<"Get the gateway client infomation">>
#{ <<"400">> => schema_bad_request() , parameters => params_client_insta()
, <<"404">> => schema_not_found() , responses =>
, <<"500">> => schema_internal_error() ?STANDARD_RESP(
, <<"200">> => schema_client() #{ 200 => emqx_dashboard_swagger:schema_with_examples(
} ref(client),
examples_client())})
}
, delete =>
#{ description => <<"Kick out the gateway client">>
, parameters => params_client_insta()
, responses =>
?STANDARD_RESP(#{204 => <<"Kicked">>})
}
}; };
swagger("/gateway/:name/clients/:clientid", delete) -> schema("/gateway/:name/clients/:clientid/subscriptions") ->
#{ description => <<"Kick out the gateway client">> #{ 'operationId' => subscriptions
, parameters => params_client_insta() , get =>
, responses => #{ description => <<"Get the gateway client subscriptions">>
#{ <<"400">> => schema_bad_request() , parameters => params_client_insta()
, <<"404">> => schema_not_found() , responses =>
, <<"500">> => schema_internal_error() ?STANDARD_RESP(
, <<"204">> => schema_no_content() #{ 200 => emqx_dashboard_swagger:schema_with_examples(
} hoconsc:array(ref(subscription)),
examples_subsctiption_list())})
}
, post =>
#{ description => <<"Create a subscription membership">>
, parameters => params_client_insta()
%% FIXME:
, requestBody => emqx_dashboard_swagger:schema_with_examples(
ref(subscription),
examples_subsctiption())
, responses =>
?STANDARD_RESP(
#{ 201 => emqx_dashboard_swagger:schema_with_examples(
ref(subscription),
examples_subsctiption())})
}
}; };
swagger("/gateway/:name/clients/:clientid/subscriptions", get) -> schema("/gateway/:name/clients/:clientid/subscriptions/:topic") ->
#{ description => <<"Get the gateway client subscriptions">> #{ 'operationId' => subscriptions
, parameters => params_client_insta() , delete =>
, responses => #{ description => <<"Delete a subscriptions membership">>
#{ <<"400">> => schema_bad_request() , parameters => params_topic_name_in_path() ++ params_client_insta()
, <<"404">> => schema_not_found() , responses =>
, <<"500">> => schema_internal_error() ?STANDARD_RESP(#{204 => <<"Unsubscribed">>})
, <<"200">> => schema_subscription_list() }
}
};
swagger("/gateway/:name/clients/:clientid/subscriptions", post) ->
#{ description => <<"Get the gateway client subscriptions">>
, parameters => params_client_insta()
, requestBody => schema_subscription()
, responses =>
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"204">> => schema_no_content()
}
};
swagger("/gateway/:name/clients/:clientid/subscriptions/:topic", delete) ->
#{ description => <<"Unsubscribe the topic for client">>
, parameters => params_topic_name_in_path() ++ params_client_insta()
, responses =>
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"204">> => schema_no_content()
}
}. }.
params_client_query() -> params_client_query() ->
params_gateway_name_in_path() params_gateway_name_in_path()
++ params_client_searching_in_qs() ++ params_client_searching_in_qs()
++ emqx_mgmt_util:page_params(). ++ params_paging().
params_client_insta() -> params_client_insta() ->
params_clientid_in_path() params_clientid_in_path()
++ params_gateway_name_in_path(). ++ params_gateway_name_in_path().
params_client_searching_in_qs() -> params_client_searching_in_qs() ->
queries( M = #{in => query, nullable => true},
[ {node, string} [ {node,
, {clientid, string} mk(binary(),
, {username, string} M#{desc => <<"Match the client's node name">>})}
, {ip_address, string} , {clientid,
, {conn_state, string} mk(binary(),
, {proto_ver, string} M#{desc => <<"Match the client's ID">>})}
, {clean_start, boolean} , {username,
, {like_clientid, string} mk(binary(),
, {like_username, string} M#{desc => <<"Match the client's Username">>})}
, {gte_created_at, string} , {ip_address,
, {lte_created_at, string} mk(binary(),
, {gte_connected_at, string} M#{desc => <<"Match the client's ip address">>})}
, {lte_connected_at, string} , {conn_state,
]). mk(binary(),
M#{desc => <<"Match the client's connection state">>})}
, {proto_ver,
mk(binary(),
M#{desc => <<"Match the client's protocol version">>})}
, {clean_start,
mk(boolean(),
M#{desc => <<"Match the client's clean start flag">>})}
, {like_clientid,
mk(binary(),
M#{desc => <<"Use sub-string to match client's ID">>})}
, {like_username,
mk(binary(),
M#{desc => <<"Use sub-string to match client's username">>})}
, {gte_created_at,
mk(binary(),
M#{desc => <<"Match the session created datetime greater than "
"a certain value">>})}
, {lte_created_at,
mk(binary(),
M#{desc => <<"Match the session created datetime less than "
"a certain value">>})}
, {gte_connected_at,
mk(binary(),
M#{desc => <<"Match the client socket connected datetime greater "
"than a certain value">>})}
, {lte_connected_at,
mk(binary(),
M#{desc => <<"Match the client socket connected datatime less than "
" a certain value">>})}
].
params_paging() ->
[ {page,
mk(integer(),
#{ in => query
, nullable => true
, desc => <<"Page Index">>})}
, {limit,
mk(integer(),
#{ in => query
, desc => <<"Page Limit">>
, nullable => true})}
].
params_gateway_name_in_path() -> params_gateway_name_in_path() ->
[#{ name => name [{name,
, in => path mk(binary(),
, schema => #{type => string} #{ in => path
, required => true , desc => <<"Gateway Name">>
}]. })}
].
params_clientid_in_path() -> params_clientid_in_path() ->
[#{ name => clientid [{clientid,
, in => path mk(binary(),
, schema => #{type => string} #{ in => path
, required => true , desc => <<"Client ID">>
}]. })}
].
params_topic_name_in_path() -> params_topic_name_in_path() ->
[#{ name => topic [{topic,
, in => path mk(binary(),
, schema => #{type => string} #{ in => path
, required => true , desc => <<"Topic Filter/Name">>
}]. })}
].
queries(Ls) ->
lists:map(fun({K, Type}) ->
#{name => K, in => query,
schema => #{type => Type},
required => false
}
end, Ls).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% schemas %% schemas
schema_clients_list() -> roots() ->
emqx_mgmt_util:page_schema( [ client
#{ type => object , subscription
, properties => properties_client() ].
}
).
schema_client() -> fields(client) ->
emqx_mgmt_util:schema( %% XXX: enum for every protocol's client
#{ type => object [ {node,
, properties => properties_client() mk(string(),
}). #{ desc => <<"Name of the node to which the client is "
"connected">>})}
schema_subscription_list() -> , {clientid,
emqx_mgmt_util:array_schema( mk(string(),
#{ type => object #{ desc => <<"Client identifier">>})}
, properties => properties_subscription() , {username,
}, mk(string(),
<<"Client subscriptions">> #{ desc => <<"Username of client when connecting">>})}
). , {proto_name,
mk(string(),
schema_subscription() -> #{ desc => <<"Client protocol name">>})}
emqx_mgmt_util:schema( , {proto_ver,
#{ type => object mk(string(),
, properties => properties_subscription() #{ desc => <<"Protocol version used by the client">>})}
} , {ip_address,
). mk(string(),
#{ desc => <<"Client's IP address">>})}
, {port,
mk(integer(),
#{ desc => <<"Client's port">>})}
, {is_bridge,
mk(boolean(),
#{ desc => <<"Indicates whether the client is connected via "
"bridge">>})}
, {connected_at,
mk(string(),
#{ desc => <<"Client connection time">>})}
, {disconnected_at,
mk(string(),
#{ desc => <<"Client offline time, This field is only valid and "
"returned when connected is false">>})}
, {connected,
mk(boolean(),
#{ desc => <<"Whether the client is connected">>})}
%% FIXME: the will_msg attribute is not a general attribute
%% for every protocol. But it should be returned to frontend if someone
%% want it
%%
%, {will_msg,
% mk(string(),
% #{ desc => <<"Client will message">>})}
%, {zone,
% mk(string(),
% #{ desc => <<"Indicate the configuration group used by the "
% "client">>})}
, {keepalive,
mk(integer(),
#{ desc => <<"keepalive time, with the unit of second">>})}
, {clean_start,
mk(boolean(),
#{ desc => <<"Indicate whether the client is using a brand "
"new session">>})}
, {expiry_interval,
mk(integer(),
#{ desc => <<"Session expiration interval, with the unit of "
"second">>})}
, {created_at,
mk(string(),
#{ desc => <<"Session creation time">>})}
, {subscriptions_cnt,
mk(integer(),
#{ desc => <<"Number of subscriptions established by this "
"client">>})}
, {subscriptions_max,
mk(integer(),
#{ desc => <<"Maximum number of subscriptions allowed by this "
"client">>})}
, {inflight_cnt,
mk(integer(),
#{ desc => <<"Current length of inflight">>})}
, {inflight_max,
mk(integer(),
#{ desc => <<"Maximum length of inflight">>})}
, {mqueue_len,
mk(integer(),
#{ desc => <<"Current length of message queue">>})}
, {mqueue_max,
mk(integer(),
#{ desc => <<"Maximum length of message queue">>})}
, {mqueue_dropped,
mk(integer(),
#{ desc => <<"Number of messages dropped by the message queue "
"due to exceeding the length">>})}
, {awaiting_rel_cnt,
mk(integer(),
#{ desc => <<"Number of awaiting PUBREC packet">>})}
, {awaiting_rel_max,
mk(integer(),
#{ desc => <<"Maximum allowed number of awaiting PUBREC "
"packet">>})}
, {recv_oct,
mk(integer(),
#{ desc => <<"Number of bytes received by EMQ X Broker">>})}
, {recv_cnt,
mk(integer(),
#{ desc => <<"Number of TCP packets received">>})}
, {recv_pkt,
mk(integer(),
#{ desc => <<"Number of MQTT packets received">>})}
, {recv_msg,
mk(integer(),
#{ desc => <<"Number of PUBLISH packets received">>})}
, {send_oct,
mk(integer(),
#{ desc => <<"Number of bytes sent">>})}
, {send_cnt,
mk(integer(),
#{ desc => <<"Number of TCP packets sent">>})}
, {send_pkt,
mk(integer(),
#{ desc => <<"Number of MQTT packets sent">>})}
, {send_msg,
mk(integer(),
#{ desc => <<"Number of PUBLISH packets sent">>})}
, {mailbox_len,
mk(integer(),
#{ desc => <<"Process mailbox size">>})}
, {heap_size,
mk(integer(),
#{ desc => <<"Process heap size with the unit of byte">>})}
, {reductions,
mk(integer(),
#{ desc => <<"Erlang reduction">>})}
];
fields(subscription) ->
[ {topic,
mk(string(),
#{ desc => <<"Topic Fillter">>})}
, {qos,
mk(integer(),
#{ desc => <<"QoS level, enum: 0, 1, 2">>})}
, {nl,
mk(integer(), %% FIXME: why not boolean?
#{ desc => <<"No Local option, enum: 0, 1">>})}
, {rap,
mk(integer(),
#{ desc => <<"Retain as Published option, enum: 0, 1">>})}
, {rh,
mk(integer(),
#{ desc => <<"Retain Handling option, enum: 0, 1, 2">>})}
, {sub_props,
mk(ref(extra_sub_props),
#{desc => <<"Subscription properties">>})}
];
fields(extra_sub_props) ->
[ {subid,
mk(string(),
#{ desc => <<"Only stomp protocol, an uniquely identity for "
"the subscription. range: 1-65535.">>})}
].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% properties defines %% examples
properties_client() -> examples_client_list() ->
%% FIXME: enum for every protocol's client #{}.
emqx_mgmt_util:properties(
[ {node, string,
<<"Name of the node to which the client is connected">>}
, {clientid, string,
<<"Client identifier">>}
, {username, string,
<<"Username of client when connecting">>}
, {proto_name, string,
<<"Client protocol name">>}
, {proto_ver, string,
<<"Protocol version used by the client">>}
, {ip_address, string,
<<"Client's IP address">>}
, {port, integer,
<<"Client's port">>}
, {is_bridge, boolean,
<<"Indicates whether the client is connectedvia bridge">>}
, {connected_at, string,
<<"Client connection time">>}
, {disconnected_at, string,
<<"Client offline time, This field is only valid and returned "
"when connected is false">>}
, {connected, boolean,
<<"Whether the client is connected">>}
%% FIXME: the will_msg attribute is not a general attribute
%% for every protocol. But it should be returned to frontend if someone
%% want it
%%
%, {will_msg, string,
% <<"Client will message">>}
%, {zone, string,
% <<"Indicate the configuration group used by the client">>}
, {keepalive, integer,
<<"keepalive time, with the unit of second">>}
, {clean_start, boolean,
<<"Indicate whether the client is using a brand new session">>}
, {expiry_interval, integer,
<<"Session expiration interval, with the unit of second">>}
, {created_at, string,
<<"Session creation time">>}
, {subscriptions_cnt, integer,
<<"Number of subscriptions established by this client">>}
, {subscriptions_max, integer,
<<"v4 api name [max_subscriptions] Maximum number of "
"subscriptions allowed by this client">>}
, {inflight_cnt, integer,
<<"Current length of inflight">>}
, {inflight_max, integer,
<<"v4 api name [max_inflight]. Maximum length of inflight">>}
, {mqueue_len, integer,
<<"Current length of message queue">>}
, {mqueue_max, integer,
<<"v4 api name [max_mqueue]. Maximum length of message queue">>}
, {mqueue_dropped, integer,
<<"Number of messages dropped by the message queue due to "
"exceeding the length">>}
, {awaiting_rel_cnt, integer,
<<"v4 api name [awaiting_rel] Number of awaiting PUBREC packet">>}
, {awaiting_rel_max, integer,
<<"v4 api name [max_awaiting_rel]. Maximum allowed number of "
"awaiting PUBREC packet">>}
, {recv_oct, integer,
<<"Number of bytes received by EMQ X Broker (the same below)">>}
, {recv_cnt, integer,
<<"Number of TCP packets received">>}
, {recv_pkt, integer,
<<"Number of MQTT packets received">>}
, {recv_msg, integer,
<<"Number of PUBLISH packets received">>}
, {send_oct, integer,
<<"Number of bytes sent">>}
, {send_cnt, integer,
<<"Number of TCP packets sent">>}
, {send_pkt, integer,
<<"Number of MQTT packets sent">>}
, {send_msg, integer,
<<"Number of PUBLISH packets sent">>}
, {mailbox_len, integer,
<<"Process mailbox size">>}
, {heap_size, integer,
<<"Process heap size with the unit of byte">>}
, {reductions, integer,
<<"Erlang reduction">>}
]).
properties_subscription() -> examples_client() ->
ExtraProps = [ {subid, string, #{}.
<<"Only stomp protocol, an uniquely identity for "
"the subscription. range: 1-65535.">>} examples_subsctiption_list() ->
], #{}.
emqx_mgmt_util:properties(
[ {topic, string, examples_subsctiption() ->
<<"Topic Fillter">>} #{}.
, {qos, integer,
<<"QoS level, enum: 0, 1, 2">>}
, {nl, integer, %% FIXME: why not boolean?
<<"No Local option, enum: 0, 1">>}
, {rap, integer,
<<"Retain as Published option, enum: 0, 1">>}
, {rh, integer,
<<"Retain Handling option, enum: 0, 1, 2">>}
, {sub_props, object, ExtraProps}
]).

View File

@ -225,7 +225,7 @@ schema("/gateway/:name/listeners") ->
, parameters => params_gateway_name_in_path() , parameters => params_gateway_name_in_path()
, responses => , responses =>
?STANDARD_RESP( ?STANDARD_RESP(
#{ 200 => emqx_dashboard_swagger:schema_with_examples( #{ 200 => emqx_dashboard_swagger:schema_with_example(
hoconsc:array(ref(listener)), hoconsc:array(ref(listener)),
examples_listener_list()) examples_listener_list())
}) })
@ -240,7 +240,7 @@ schema("/gateway/:name/listeners") ->
?STANDARD_RESP( ?STANDARD_RESP(
#{ 201 => emqx_dashboard_swagger:schema_with_examples( #{ 201 => emqx_dashboard_swagger:schema_with_examples(
ref(listener), ref(listener),
examples_listener_list()) examples_listener())
}) })
} }
}; };
@ -580,7 +580,96 @@ common_listener_opts() ->
%% examples %% examples
examples_listener_list() -> examples_listener_list() ->
#{stomp_listeners => [examples_listener()]}. [Config || #{value := Config} <- maps:values(examples_listener())].
examples_listener() -> examples_listener() ->
#{}. #{ tcp_listener=>
#{ summary => <<"A simple tcp listener example">>
, value =>
#{ bind => <<"61613">>
, acceptors => 16
, max_connections => 1024000
, max_conn_rate => 1000
, tcp =>
#{ active_n => 100
, backlog => 1024
, send_timeout => <<"15s">>
, send_timeout_close => true
, recbuf => <<"10KB">>
, sndbuf => <<"10KB">>
, buffer => <<"10KB">>
, high_watermark => <<"1MB">>
, nodelay => false
, reuseaddr => true
}
}
}
, ssl_listener =>
#{ summary => <<"A simple ssl listener example">>
, value =>
#{ bind => <<"61614">>
, acceptors => 16
, max_connections => 1024000
, max_conn_rate => 1000
, access_rules => [<<"allow all">>]
, ssl =>
#{ versions => [<<"tlsv1.3">>, <<"tlsv1.2">>,
<<"tlsv1.1">>, <<"tlsv1">>]
, cacertfile => <<"etc/certs/cacert.pem">>
, certfile => <<"etc/certs/cert.pem">>
, keyfile => <<"etc/certs/key.pem">>
, verify => <<"verify_none">>
, fail_if_no_peer_cert => false
, server_name_indication => disable
}
, tcp =>
#{ active_n => 100
, backlog => 1024
}
}
}
, udp_listener =>
#{ summary => <<"A simple udp listener example">>
, value =>
#{ bind => <<"0.0.0.0:1884">>
, udp =>
#{ active_n => 100
, recbuf => <<"10KB">>
, sndbuf => <<"10KB">>
, buffer => <<"10KB">>
, reuseaddr => true
}
}
}
, dtls_listener =>
#{ summary => <<"A simple dtls listener example">>
, value =>
#{ bind => <<"5684">>
, acceptors => 16
, max_connections => 1024000
, max_conn_rate => 1000
, access_rules => [<<"allow all">>]
, ssl =>
#{ versions => [<<"dtlsv1.2">>, <<"dtlsv1">>]
, cacertfile => <<"etc/certs/cacert.pem">>
, certfile => <<"etc/certs/cert.pem">>
, keyfile => <<"etc/certs/key.pem">>
, verify => <<"verify_none">>
, fail_if_no_peer_cert => false
, server_name_indication => disable
}
, tcp =>
#{ active_n => 100
, backlog => 1024
}
}
}
, dtls_listener_with_psk_ciphers =>
#{ summary => <<"todo">>
, value =>
#{}
}
, lisetner_with_authn =>
#{ summary => <<"todo">>
, value => #{}}
}.

View File

@ -143,9 +143,9 @@ The client just sends its PUBLISH messages to a GW"
sc(hoconsc:array(ref(mqttsn_predefined)), sc(hoconsc:array(ref(mqttsn_predefined)),
#{ default => [] #{ default => []
, desc => , desc =>
"The Pre-defined topic ids and topic names.<br> <<"The Pre-defined topic ids and topic names.<br>
A 'pre-defined' topic id is a topic id whose mapping to a topic name A 'pre-defined' topic id is a topic id whose mapping to a topic name
is known in advance by both the client's application and the gateway" is known in advance by both the clients application and the gateway">>
})} })}
, {listeners, sc(ref(udp_listeners))} , {listeners, sc(ref(udp_listeners))}
] ++ gateway_common_options(); ] ++ gateway_common_options();