%%-------------------------------------------------------------------- %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_mgmt_api_clients). -behaviour(minirest_api). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include("emqx_mgmt.hrl"). %% API -export([api_spec/0]). -export([ clients/2 , client/2 , subscriptions/2 , authz_cache/2 , subscribe/2 , unsubscribe/2 , subscribe_batch/2 ]). -export([ query/4 , format_channel_info/1 ]). %% for batch operation -export([do_subscribe/3]). %% for test suite -export([ unix_ts_to_rfc3339_bin/1 , unix_ts_to_rfc3339_bin/2 , time_string_to_unix_ts_int/1 , time_string_to_unix_ts_int/2 ]). -define(CLIENT_QS_SCHEMA, {emqx_channel_info, [ {<<"node">>, atom} , {<<"username">>, binary} , {<<"zone">>, atom} , {<<"ip_address">>, ip} , {<<"conn_state">>, atom} , {<<"clean_start">>, atom} , {<<"proto_name">>, binary} , {<<"proto_ver">>, integer} , {<<"like_clientid">>, binary} , {<<"like_username">>, binary} , {<<"gte_created_at">>, timestamp} , {<<"lte_created_at">>, timestamp} , {<<"gte_connected_at">>, timestamp} , {<<"lte_connected_at">>, timestamp}]}). -define(query_fun, {?MODULE, query}). -define(format_fun, {?MODULE, format_channel_info}). -define(CLIENT_ID_NOT_FOUND, <<"{\"code\": \"RESOURCE_NOT_FOUND\", \"reason\": \"Client id not found\"}">>). api_spec() -> {apis(), schemas()}. apis() -> [ clients_api() , client_api() , clients_authz_cache_api() , clients_subscriptions_api() , subscribe_api() , unsubscribe_api()]. schemas() -> Client = #{ client => #{ type => object, properties => emqx_mgmt_util:properties(properties(client)) } }, AuthzCache = #{ authz_cache => #{ type => object, properties => emqx_mgmt_util:properties(properties(authz_cache)) } }, [Client, AuthzCache]. properties(client) -> [ {awaiting_rel_cnt, integer, <<"v4 api name [awaiting_rel] Number of awaiting PUBREC packet">>}, {awaiting_rel_max, integer, <<"v4 api name [max_awaiting_rel]. Maximum allowed number of awaiting PUBREC packet">>}, {clean_start, boolean, <<"Indicate whether the client is using a brand new session">>}, {clientid, string , <<"Client identifier">>}, {connected, boolean, <<"Whether the client is connected">>}, {connected_at, string , <<"Client connection time, rfc3339">>}, {created_at, string , <<"Session creation time, rfc3339">>}, {disconnected_at, string , <<"Client offline time, This field is only valid and returned when connected is false, rfc3339">>}, {expiry_interval, integer, <<"Session expiration interval, with the unit of second">>}, {heap_size, integer, <<"Process heap size with the unit of byte">>}, {inflight_cnt, integer, <<"Current length of inflight">>}, {inflight_max, integer, <<"v4 api name [max_inflight]. Maximum length of inflight">>}, {ip_address, string , <<"Client's IP address">>}, {port, integer, <<"Client's port">>}, {is_bridge, boolean, <<"Indicates whether the client is connectedvia bridge">>}, {keepalive, integer, <<"keepalive time, with the unit of second">>}, {mailbox_len, integer, <<"Process mailbox size">>}, {mqueue_dropped, integer, <<"Number of messages dropped by the message queue due to exceeding the length">>}, {mqueue_len, integer, <<"Current length of message queue">>}, {mqueue_max, integer, <<"v4 api name [max_mqueue]. Maximum length of message queue">>}, {node, string , <<"Name of the node to which the client is connected">>}, {proto_name, string , <<"Client protocol name">>}, {proto_ver, integer, <<"Protocol version used by the client">>}, {recv_cnt, integer, <<"Number of TCP packets received">>}, {recv_msg, integer, <<"Number of PUBLISH packets received">>}, {recv_oct, integer, <<"Number of bytes received by EMQ X Broker (the same below)">>}, {recv_pkt, integer, <<"Number of MQTT packets received">>}, {reductions, integer, <<"Erlang reduction">>}, {send_cnt, integer, <<"Number of TCP packets sent">>}, {send_msg, integer, <<"Number of PUBLISH packets sent">>}, {send_oct, integer, <<"Number of bytes sent">>}, {send_pkt, integer, <<"Number of MQTT packets sent">>}, {subscriptions_cnt, integer, <<"Number of subscriptions established by this client.">>}, {subscriptions_max, integer, <<"v4 api name [max_subscriptions] Maximum number of subscriptions allowed by this client">>}, {username, string , <<"User name of client when connecting">>}, {will_msg, string , <<"Client will message">>}, {zone, string , <<"Indicate the configuration group used by the client">>} ]; properties(authz_cache) -> [ {access, string, <<"Access type">>}, {result, string, <<"Allow or deny">>}, {topic, string, <<"Topic name">>}, {updated_time, integer, <<"Update time">>} ]. clients_api() -> Metadata = #{ get => #{ description => <<"List clients">>, parameters => [ #{ name => page, in => query, required => false, description => <<"Page">>, schema => #{type => integer} }, #{ name => limit, in => query, required => false, description => <<"Page limit">>, schema => #{type => integer} }, #{ name => node, in => query, required => false, description => <<"Node name">>, schema => #{type => string} }, #{ name => username, in => query, required => false, description => <<"User name">>, schema => #{type => string} }, #{ name => zone, in => query, required => false, schema => #{type => string} }, #{ name => ip_address, in => query, required => false, description => <<"Client's IP address">>, schema => #{type => string} }, #{ name => conn_state, in => query, required => false, description => <<"The current connection status of the client, the possible values are connected,idle,disconnected">>, schema => #{type => string, enum => [connected, idle, disconnected]} }, #{ name => clean_start, in => query, required => false, description => <<"Whether the client uses a new session">>, schema => #{type => boolean} }, #{ name => proto_name, in => query, required => false, description => <<"Client protocol name, the possible values are MQTT,CoAP,LwM2M,MQTT-SN">>, schema => #{type => string, enum => ['MQTT', 'CoAP', 'LwM2M', 'MQTT-SN']} }, #{ name => proto_ver, in => query, required => false, description => <<"Client protocol version">>, schema => #{type => string} }, #{ name => like_clientid, in => query, required => false, description => <<"Fuzzy search of client identifier by substring method">>, schema => #{type => string} }, #{ name => like_username, in => query, required => false, description => <<"Client user name, fuzzy search by substring">>, schema => #{type => string} }, #{ name => gte_created_at, in => query, required => false, description => <<"Search client session creation time by greater than or equal method, rfc3339 or timestamp(millisecond)">>, schema => #{type => string} }, #{ name => lte_created_at, in => query, required => false, description => <<"Search client session creation time by less than or equal method, rfc3339 or timestamp(millisecond)">>, schema => #{type => string} }, #{ name => gte_connected_at, in => query, required => false, description => <<"Search client connection creation time by greater than or equal method, rfc3339 or timestamp(millisecond)">>, schema => #{type => string} }, #{ name => lte_connected_at, in => query, required => false, description => <<"Search client connection creation time by less than or equal method, rfc3339 or timestamp(millisecond) ">>, schema => #{type => string} } ], responses => #{ <<"200">> => emqx_mgmt_util:array_schema(client, <<"List clients 200 OK">>), <<"400">> => emqx_mgmt_util:error_schema(<<"Invalid parameters">>, ['INVALID_PARAMETER'])}}}, {"/clients", Metadata, clients}. client_api() -> Metadata = #{ get => #{ description => <<"Get clients info by client ID">>, parameters => [#{ name => clientid, in => path, schema => #{type => string}, required => true }], responses => #{ <<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>), <<"200">> => emqx_mgmt_util:schema(client, <<"List clients 200 OK">>)}}, delete => #{ description => <<"Kick out client by client ID">>, parameters => [#{ name => clientid, in => path, schema => #{type => string}, required => true }], responses => #{ <<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>), <<"200">> => emqx_mgmt_util:schema(client, <<"List clients 200 OK">>)}}}, {"/clients/:clientid", Metadata, client}. clients_authz_cache_api() -> Metadata = #{ get => #{ description => <<"Get client authz cache">>, parameters => [#{ name => clientid, in => path, schema => #{type => string}, required => true }], responses => #{ <<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>), <<"200">> => emqx_mgmt_util:schema(authz_cache, <<"Get client authz cache">>)}}, delete => #{ description => <<"Clean client authz cache">>, parameters => [#{ name => clientid, in => path, schema => #{type => string}, required => true }], responses => #{ <<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>), <<"200">> => emqx_mgmt_util:schema(<<"Delete clients 200 OK">>)}}}, {"/clients/:clientid/authz_cache", Metadata, authz_cache}. clients_subscriptions_api() -> Metadata = #{ get => #{ description => <<"Get client subscriptions">>, parameters => [#{ name => clientid, in => path, schema => #{type => string}, required => true }], responses => #{ <<"200">> => emqx_mgmt_util:array_schema(subscription, <<"Get client subscriptions">>)}} }, {"/clients/:clientid/subscriptions", Metadata, subscriptions}. unsubscribe_api() -> Metadata = #{ post => #{ description => <<"Unsubscribe">>, parameters => [ #{ name => clientid, in => path, schema => #{type => string}, required => true } ], 'requestBody' => emqx_mgmt_util:schema(#{ type => object, properties => #{ topic => #{ type => string, description => <<"Topic">>}}}), responses => #{ <<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>), <<"200">> => emqx_mgmt_util:schema(<<"Unsubscribe ok">>)}}}, {"/clients/:clientid/unsubscribe", Metadata, unsubscribe}. subscribe_api() -> Metadata = #{ post => #{ description => <<"Subscribe">>, parameters => [#{ name => clientid, in => path, schema => #{type => string}, required => true }], 'requestBody' => emqx_mgmt_util:schema(#{ type => object, properties => #{ topic => #{ type => string, description => <<"Topic">>}, qos => #{ type => integer, enum => [0, 1, 2], example => 0, description => <<"QoS">>}}}), responses => #{ <<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>), <<"200">> => emqx_mgmt_util:schema(<<"Subscribe ok">>)}}}, {"/clients/:clientid/subscribe", Metadata, subscribe}. %%%============================================================================================== %% parameters trans clients(get, #{query_string := Qs}) -> list(generate_qs(Qs)). client(get, #{bindings := Bindings}) -> lookup(Bindings); client(delete, #{bindings := Bindings}) -> kickout(Bindings). authz_cache(get, #{bindings := Bindings}) -> get_authz_cache(Bindings); authz_cache(delete, #{bindings := Bindings}) -> clean_authz_cache(Bindings). subscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) -> Topic = maps:get(<<"topic">>, TopicInfo), Qos = maps:get(<<"qos">>, TopicInfo, 0), subscribe(#{clientid => ClientID, topic => Topic, qos => Qos}). unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) -> Topic = maps:get(<<"topic">>, TopicInfo), unsubscribe(#{clientid => ClientID, topic => Topic}). %% TODO: batch subscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) -> Topics = [begin Topic = maps:get(<<"topic">>, TopicInfo), Qos = maps:get(<<"qos">>, TopicInfo, 0), #{topic => Topic, qos => Qos} end || TopicInfo <- TopicInfos], subscribe_batch(#{clientid => ClientID, topics => Topics}). subscriptions(get, #{bindings := #{clientid := ClientID}}) -> {Node, Subs0} = emqx_mgmt:list_client_subscriptions(ClientID), Subs = lists:map(fun({Topic, SubOpts}) -> #{node => Node, clientid => ClientID, topic => Topic, qos => maps:get(qos, SubOpts)} end, Subs0), {200, Subs}. %%%============================================================================================== %% api apply list(Params) -> {Tab, QuerySchema} = ?CLIENT_QS_SCHEMA, case maps:get(<<"node">>, Params, undefined) of undefined -> Response = emqx_mgmt_api:cluster_query(Params, Tab, QuerySchema, ?query_fun), emqx_mgmt_util:generate_response(Response); Node1 -> Node = binary_to_atom(Node1, utf8), ParamsWithoutNode = maps:without([<<"node">>], Params), Response = emqx_mgmt_api:node_query(Node, ParamsWithoutNode, Tab, QuerySchema, ?query_fun), emqx_mgmt_util:generate_response(Response) end. lookup(#{clientid := ClientID}) -> case emqx_mgmt:lookup_client({clientid, ClientID}, ?format_fun) of [] -> {404, ?CLIENT_ID_NOT_FOUND}; ClientInfo -> {200, hd(ClientInfo)} end. kickout(#{clientid := ClientID}) -> emqx_mgmt:kickout_client(ClientID), {200}. get_authz_cache(#{clientid := ClientID})-> case emqx_mgmt:list_authz_cache(ClientID) of {error, not_found} -> {404, ?CLIENT_ID_NOT_FOUND}; {error, Reason} -> Message = list_to_binary(io_lib:format("~p", [Reason])), {500, #{code => <<"UNKNOW_ERROR">>, message => Message}}; Caches -> Response = [format_authz_cache(Cache) || Cache <- Caches], {200, Response} end. clean_authz_cache(#{clientid := ClientID}) -> case emqx_mgmt:clean_authz_cache(ClientID) of ok -> {200}; {error, not_found} -> {404, ?CLIENT_ID_NOT_FOUND}; {error, Reason} -> Message = list_to_binary(io_lib:format("~p", [Reason])), {500, #{code => <<"UNKNOW_ERROR">>, message => Message}} end. subscribe(#{clientid := ClientID, topic := Topic, qos := Qos}) -> case do_subscribe(ClientID, Topic, Qos) of {error, channel_not_found} -> {404, ?CLIENT_ID_NOT_FOUND}; {error, Reason} -> Message = list_to_binary(io_lib:format("~p", [Reason])), {500, #{code => <<"UNKNOW_ERROR">>, message => Message}}; ok -> {200} end. unsubscribe(#{clientid := ClientID, topic := Topic}) -> case do_unsubscribe(ClientID, Topic) of {error, channel_not_found} -> {404, ?CLIENT_ID_NOT_FOUND}; {error, Reason} -> Message = list_to_binary(io_lib:format("~p", [Reason])), {500, #{code => <<"UNKNOW_ERROR">>, message => Message}}; {unsubscribe, [{Topic, #{}}]} -> {200} end. subscribe_batch(#{clientid := ClientID, topics := Topics}) -> ArgList = [[ClientID, Topic, Qos] || #{topic := Topic, qos := Qos} <- Topics], emqx_mgmt_util:batch_operation(?MODULE, do_subscribe, ArgList). %%-------------------------------------------------------------------- %% internal function do_subscribe(ClientID, Topic0, Qos) -> {Topic, Opts} = emqx_topic:parse(Topic0), TopicTable = [{Topic, Opts#{qos => Qos}}], case emqx_mgmt:subscribe(ClientID, TopicTable) of {error, Reason} -> {error, Reason}; {subscribe, Subscriptions} -> case proplists:is_defined(Topic, Subscriptions) of true -> ok; false -> {error, unknow_error} end end. do_unsubscribe(ClientID, Topic) -> case emqx_mgmt:unsubscribe(ClientID, Topic) of {error, Reason} -> {error, Reason}; Res -> Res end. %%-------------------------------------------------------------------- %% QueryString Generation (try rfc3339 to timestamp or keep timestamp) time_keys() -> [ <<"gte_created_at">> , <<"lte_created_at">> , <<"gte_connected_at">> , <<"lte_connected_at">>]. generate_qs(Qs) -> Fun = fun (Key, NQs) -> case NQs of %% TimeString likes "2021-01-01T00:00:00.000+08:00" (in rfc3339) %% or "1609430400000" (in millisecond) #{Key := TimeString} -> NQs#{Key => time_string_to_unix_ts_int(TimeString)}; #{} -> NQs end end, lists:foldl(Fun, Qs, time_keys()). %%-------------------------------------------------------------------- %% Query Functions query(Tab, {Qs, []}, Continuation, Limit) -> Ms = qs2ms(Qs), emqx_mgmt_api:select_table_with_count(Tab, Ms, Continuation, Limit, fun format_channel_info/1); query(Tab, {Qs, Fuzzy}, Continuation, Limit) -> Ms = qs2ms(Qs), FuzzyFilterFun = fuzzy_filter_fun(Fuzzy), emqx_mgmt_api:select_table_with_count(Tab, {Ms, FuzzyFilterFun}, Continuation, Limit, fun format_channel_info/1). %%-------------------------------------------------------------------- %% QueryString to Match Spec -spec qs2ms(list()) -> ets:match_spec(). qs2ms(Qs) -> {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}), [{{'$1', MtchHead, '_'}, Conds, ['$_']}]. qs2ms([], _, {MtchHead, Conds}) -> {MtchHead, lists:reverse(Conds)}; qs2ms([{Key, '=:=', Value} | Rest], N, {MtchHead, Conds}) -> NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Value)), qs2ms(Rest, N, {NMtchHead, Conds}); qs2ms([Qs | Rest], N, {MtchHead, Conds}) -> Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8), NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)), NConds = put_conds(Qs, Holder, Conds), qs2ms(Rest, N+1, {NMtchHead, NConds}). put_conds({_, Op, V}, Holder, Conds) -> [{Op, Holder, V} | Conds]; put_conds({_, Op1, V1, Op2, V2}, Holder, Conds) -> [{Op2, Holder, V2}, {Op1, Holder, V1} | Conds]. ms(clientid, X) -> #{clientinfo => #{clientid => X}}; ms(username, X) -> #{clientinfo => #{username => X}}; ms(zone, X) -> #{clientinfo => #{zone => X}}; ms(conn_state, X) -> #{conn_state => X}; ms(ip_address, X) -> #{conninfo => #{peername => {X, '_'}}}; ms(clean_start, X) -> #{conninfo => #{clean_start => X}}; ms(proto_name, X) -> #{conninfo => #{proto_name => X}}; ms(proto_ver, X) -> #{conninfo => #{proto_ver => X}}; ms(connected_at, X) -> #{conninfo => #{connected_at => X}}; ms(created_at, X) -> #{session => #{created_at => X}}. %%-------------------------------------------------------------------- %% Match funcs fuzzy_filter_fun(Fuzzy) -> REFuzzy = lists:map(fun({K, like, S}) -> {ok, RE} = re:compile(escape(S)), {K, like, RE} end, Fuzzy), fun(MsRaws) when is_list(MsRaws) -> lists:filter( fun(E) -> run_fuzzy_filter(E, REFuzzy) end , MsRaws) end. escape(B) when is_binary(B) -> re:replace(B, <<"\\\\">>, <<"\\\\\\\\">>, [{return, binary}, global]). run_fuzzy_filter(_, []) -> true; run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, _, RE} | Fuzzy]) -> Val = case maps:get(Key, ClientInfo, "") of undefined -> ""; V -> V end, re:run(Val, RE, [{capture, none}]) == match andalso run_fuzzy_filter(E, Fuzzy). %%-------------------------------------------------------------------- %% format funcs format_channel_info({_, ClientInfo, ClientStats}) -> StatsMap = maps:without([memory, next_pkt_id, total_heap_size], maps:from_list(ClientStats)), ClientInfoMap0 = maps:fold(fun take_maps_from_inner/3, #{}, ClientInfo), {IpAddress, Port} = peername_dispart(maps:get(peername, ClientInfoMap0)), Connected = maps:get(conn_state, ClientInfoMap0) =:= connected, ClientInfoMap1 = maps:merge(StatsMap, ClientInfoMap0), ClientInfoMap2 = maps:put(node, node(), ClientInfoMap1), ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2), ClientInfoMap4 = maps:put(port, Port, ClientInfoMap3), ClientInfoMap = maps:put(connected, Connected, ClientInfoMap4), RemoveList = [ auth_result , peername , sockname , peerhost , conn_state , send_pend , conn_props , peercert , sockstate , subscriptions , receive_maximum , protocol , is_superuser , sockport , anonymous , mountpoint , socktype , active_n , await_rel_timeout , conn_mod , sockname , retry_interval , upgrade_qos ], TimesKeys = [created_at, connected_at, disconnected_at], %% format timestamp to rfc3339 lists:foldl(fun result_format_time_fun/2 , maps:without(RemoveList, ClientInfoMap) , TimesKeys). %% format func helpers take_maps_from_inner(_Key, Value, Current) when is_map(Value) -> maps:merge(Current, Value); take_maps_from_inner(Key, Value, Current) -> maps:put(Key, Value, Current). result_format_time_fun(Key, NClientInfoMap) -> case NClientInfoMap of #{Key := TimeStamp} -> NClientInfoMap#{Key => unix_ts_to_rfc3339_bin(TimeStamp)}; #{} -> NClientInfoMap end. -spec(peername_dispart(emqx_types:peername()) -> {binary(), inet:port_number()}). peername_dispart({Addr, Port}) -> AddrBinary = list_to_binary(inet:ntoa(Addr)), %% PortBinary = integer_to_binary(Port), {AddrBinary, Port}. format_authz_cache({{PubSub, Topic}, {AuthzResult, Timestamp}}) -> #{ access => PubSub, topic => Topic, result => AuthzResult, updated_time => Timestamp }. %%-------------------------------------------------------------------- %% time format funcs unix_ts_to_rfc3339_bin(TimeStamp) -> unix_ts_to_rfc3339_bin(TimeStamp, millisecond). unix_ts_to_rfc3339_bin(TimeStamp, Unit) when is_integer(TimeStamp) -> list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, Unit}])). time_string_to_unix_ts_int(DateTime) -> time_string_to_unix_ts_int(DateTime, millisecond). time_string_to_unix_ts_int(DateTime, Unit) when is_binary(DateTime) -> try binary_to_integer(DateTime) of TimeStamp when is_integer(TimeStamp) -> TimeStamp catch error:badarg -> calendar:rfc3339_to_system_time(binary_to_list(DateTime), [{unit, Unit}]) end.