%%-------------------------------------------------------------------- %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_mgmt_api_clients). -behaviour(minirest_api). -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_cm.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). -include("emqx_mgmt.hrl"). %% API -export([ api_spec/0, paths/0, schema/1, fields/1 ]). -export([ clients/2, kickout_clients/2, client/2, subscriptions/2, authz_cache/2, subscribe/2, subscribe_batch/2, unsubscribe/2, unsubscribe_batch/2, set_keepalive/2 ]). -export([ qs2ms/2, run_fuzzy_filter/2, format_channel_info/1, format_channel_info/2 ]). %% for batch operation -export([do_subscribe/3]). -define(TAGS, [<<"Clients">>]). -define(CLIENT_QSCHEMA, [ {<<"node">>, atom}, {<<"username">>, binary}, {<<"ip_address">>, ip}, {<<"conn_state">>, atom}, {<<"clean_start">>, atom}, {<<"proto_ver">>, integer}, {<<"like_clientid">>, binary}, {<<"like_username">>, binary}, {<<"gte_created_at">>, timestamp}, {<<"lte_created_at">>, timestamp}, {<<"gte_connected_at">>, timestamp}, {<<"lte_connected_at">>, timestamp} ]). -define(FORMAT_FUN, {?MODULE, format_channel_info}). -define(CLIENTID_NOT_FOUND, #{ code => 'CLIENTID_NOT_FOUND', message => <<"Client ID not found">> }). api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}). paths() -> [ "/clients", "/clients/kickout/bulk", "/clients/:clientid", "/clients/:clientid/authorization/cache", "/clients/:clientid/subscriptions", "/clients/:clientid/subscribe", "/clients/:clientid/subscribe/bulk", "/clients/:clientid/unsubscribe", "/clients/:clientid/unsubscribe/bulk", "/clients/:clientid/keepalive" ]. schema("/clients") -> #{ 'operationId' => clients, get => #{ description => ?DESC(list_clients), tags => ?TAGS, parameters => [ hoconsc:ref(emqx_dashboard_swagger, page), hoconsc:ref(emqx_dashboard_swagger, limit), {node, hoconsc:mk(binary(), #{ in => query, required => false, desc => <<"Node name">>, example => <<"emqx@127.0.0.1">> })}, {username, hoconsc:mk(binary(), #{ in => query, required => false, desc => <<"User name">> })}, {ip_address, hoconsc:mk(binary(), #{ in => query, required => false, desc => <<"Client's IP address">>, example => <<"127.0.0.1">> })}, {conn_state, hoconsc:mk(hoconsc:enum([connected, idle, disconnected]), #{ in => query, required => false, desc => <<"The current connection status of the client, ", "the possible values are connected,idle,disconnected">> })}, {clean_start, hoconsc:mk(boolean(), #{ in => query, required => false, description => <<"Whether the client uses a new session">> })}, {proto_ver, hoconsc:mk(binary(), #{ in => query, required => false, desc => <<"Client protocol version">> })}, {like_clientid, hoconsc:mk(binary(), #{ in => query, required => false, desc => <<"Fuzzy search `clientid` as substring">> })}, {like_username, hoconsc:mk(binary(), #{ in => query, required => false, desc => <<"Fuzzy search `username` as substring">> })}, {gte_created_at, hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{ in => query, required => false, desc => <<"Search client session creation time by greater", " than or equal method, rfc3339 or timestamp(millisecond)">> })}, {lte_created_at, hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{ in => query, required => false, desc => <<"Search client session creation time by less", " than or equal method, rfc3339 or timestamp(millisecond)">> })}, {gte_connected_at, hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{ in => query, required => false, desc => << "Search client connection creation time by greater" " than or equal method, rfc3339 or timestamp(epoch millisecond)" >> })}, {lte_connected_at, hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{ in => query, required => false, desc => << "Search client connection creation time by less" " than or equal method, rfc3339 or timestamp(millisecond)" >> })} ], responses => #{ 200 => emqx_dashboard_swagger:schema_with_example(?R_REF(clients), #{ <<"data">> => [client_example()], <<"meta">> => #{ <<"count">> => 1, <<"limit">> => 50, <<"page">> => 1, <<"hasnext">> => false } }), 400 => emqx_dashboard_swagger:error_codes( ['INVALID_PARAMETER'], <<"Invalid parameters">> ) } } }; schema("/clients/kickout/bulk") -> #{ 'operationId' => kickout_clients, post => #{ description => ?DESC(kickout_clients), tags => ?TAGS, 'requestBody' => emqx_dashboard_swagger:schema_with_example( hoconsc:array(binary()), ["emqx_clientid_985bb09d", "emqx_clientid_211cc01c"] ), responses => #{ 204 => <<"Kick out clients successfully">> } } }; schema("/clients/:clientid") -> #{ 'operationId' => client, get => #{ description => ?DESC(clients_info_from_id), tags => ?TAGS, parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}], responses => #{ 200 => emqx_dashboard_swagger:schema_with_example( ?R_REF(client), client_example() ), 404 => emqx_dashboard_swagger:error_codes( ['CLIENTID_NOT_FOUND'], <<"Client ID not found">> ) } }, delete => #{ description => ?DESC(kick_client_id), tags => ?TAGS, parameters => [ {clientid, hoconsc:mk(binary(), #{in => path})} ], responses => #{ 204 => <<"Kick out client successfully">>, 404 => emqx_dashboard_swagger:error_codes( ['CLIENTID_NOT_FOUND'], <<"Client ID not found">> ) } } }; schema("/clients/:clientid/authorization/cache") -> #{ 'operationId' => authz_cache, get => #{ description => ?DESC(get_authz_cache), tags => ?TAGS, parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}], responses => #{ 200 => hoconsc:mk(hoconsc:ref(?MODULE, authz_cache), #{}), 404 => emqx_dashboard_swagger:error_codes( ['CLIENTID_NOT_FOUND'], <<"Client ID not found">> ) } }, delete => #{ description => ?DESC(clean_authz_cache), tags => ?TAGS, parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}], responses => #{ 204 => <<"Clean client authz cache successfully">>, 404 => emqx_dashboard_swagger:error_codes( ['CLIENTID_NOT_FOUND'], <<"Client ID not found">> ) } } }; schema("/clients/:clientid/subscriptions") -> #{ 'operationId' => subscriptions, get => #{ description => ?DESC(get_client_subs), tags => ?TAGS, parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}], responses => #{ 200 => hoconsc:mk( hoconsc:array(hoconsc:ref(emqx_mgmt_api_subscriptions, subscription)), #{} ), 404 => emqx_dashboard_swagger:error_codes( ['CLIENTID_NOT_FOUND'], <<"Client ID not found">> ) } } }; schema("/clients/:clientid/subscribe") -> #{ 'operationId' => subscribe, post => #{ description => ?DESC(subscribe), tags => ?TAGS, parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}], 'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, subscribe)), responses => #{ 200 => hoconsc:ref(emqx_mgmt_api_subscriptions, subscription), 404 => emqx_dashboard_swagger:error_codes( ['CLIENTID_NOT_FOUND'], <<"Client ID not found">> ) } } }; schema("/clients/:clientid/subscribe/bulk") -> #{ 'operationId' => subscribe_batch, post => #{ description => ?DESC(subscribe_g), tags => ?TAGS, parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}], 'requestBody' => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, subscribe))), responses => #{ 200 => hoconsc:array(hoconsc:ref(emqx_mgmt_api_subscriptions, subscription)), 404 => emqx_dashboard_swagger:error_codes( ['CLIENTID_NOT_FOUND'], <<"Client ID not found">> ) } } }; schema("/clients/:clientid/unsubscribe") -> #{ 'operationId' => unsubscribe, post => #{ description => ?DESC(unsubscribe), tags => ?TAGS, parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}], 'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, unsubscribe)), responses => #{ 204 => <<"Unsubscribe OK">>, 404 => emqx_dashboard_swagger:error_codes( ['CLIENTID_NOT_FOUND'], <<"Client ID not found">> ) } } }; schema("/clients/:clientid/unsubscribe/bulk") -> #{ 'operationId' => unsubscribe_batch, post => #{ description => ?DESC(unsubscribe_g), tags => ?TAGS, parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}], 'requestBody' => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, unsubscribe))), responses => #{ 204 => <<"Unsubscribe OK">>, 404 => emqx_dashboard_swagger:error_codes( ['CLIENTID_NOT_FOUND'], <<"Client ID not found">> ) } } }; schema("/clients/:clientid/keepalive") -> #{ 'operationId' => set_keepalive, put => #{ description => ?DESC(set_keepalive_seconds), tags => ?TAGS, hidden => true, parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}], 'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, keepalive)), responses => #{ 200 => emqx_dashboard_swagger:schema_with_example( ?R_REF(client), client_example() ), 404 => emqx_dashboard_swagger:error_codes( ['CLIENTID_NOT_FOUND'], <<"Client ID not found">> ) } } }. fields(clients) -> [ {data, hoconsc:mk(hoconsc:array(?REF(client)), #{})}, {meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, meta), #{})} ]; fields(client) -> [ {awaiting_rel_cnt, hoconsc:mk(integer(), #{ desc => <<"v4 api name [awaiting_rel] Number of awaiting PUBREC packet">> })}, {awaiting_rel_max, hoconsc:mk(integer(), #{ desc => << "v4 api name [max_awaiting_rel]. " "Maximum allowed number of awaiting PUBREC packet" >> })}, {clean_start, hoconsc:mk(boolean(), #{ desc => <<"Indicate whether the client is using a brand new session">> })}, {clientid, hoconsc:mk(binary(), #{desc => <<"Client identifier">>})}, {connected, hoconsc:mk(boolean(), #{desc => <<"Whether the client is connected">>})}, {connected_at, hoconsc:mk( emqx_utils_calendar:epoch_millisecond(), #{desc => <<"Client connection time, rfc3339 or timestamp(millisecond)">>} )}, {created_at, hoconsc:mk( emqx_utils_calendar:epoch_millisecond(), #{desc => <<"Session creation time, rfc3339 or timestamp(millisecond)">>} )}, {disconnected_at, hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{ desc => << "Client offline time." " It's Only valid and returned when connected is false, rfc3339 or timestamp(millisecond)" >> })}, {expiry_interval, hoconsc:mk(integer(), #{ desc => <<"Session expiration interval, with the unit of second">> })}, {heap_size, hoconsc:mk(integer(), #{ desc => <<"Process heap size with the unit of byte">> })}, {inflight_cnt, hoconsc:mk(integer(), #{desc => <<"Current length of inflight">>})}, {inflight_max, hoconsc:mk(integer(), #{ desc => <<"v4 api name [max_inflight]. Maximum length of inflight">> })}, {ip_address, hoconsc:mk(binary(), #{desc => <<"Client's IP address">>})}, {is_bridge, hoconsc:mk(boolean(), #{ desc => <<"Indicates whether the client is connected via bridge">> })}, {keepalive, hoconsc:mk(integer(), #{ desc => <<"keepalive time, with the unit of second">> })}, {mailbox_len, hoconsc:mk(integer(), #{desc => <<"Process mailbox size">>})}, {mqueue_dropped, hoconsc:mk(integer(), #{ desc => <<"Number of messages dropped by the message queue due to exceeding the length">> })}, {mqueue_len, hoconsc:mk(integer(), #{desc => <<"Current length of message queue">>})}, {mqueue_max, hoconsc:mk(integer(), #{ desc => <<"v4 api name [max_mqueue]. Maximum length of message queue">> })}, {node, hoconsc:mk(binary(), #{ desc => <<"Name of the node to which the client is connected">> })}, {port, hoconsc:mk(integer(), #{desc => <<"Client's port">>})}, {proto_name, hoconsc:mk(binary(), #{desc => <<"Client protocol name">>})}, {proto_ver, hoconsc:mk(integer(), #{desc => <<"Protocol version used by the client">>})}, {recv_cnt, hoconsc:mk(integer(), #{desc => <<"Number of TCP packets received">>})}, {recv_msg, hoconsc:mk(integer(), #{desc => <<"Number of PUBLISH packets received">>})}, {'recv_msg.dropped', hoconsc:mk(integer(), #{ desc => <<"Number of dropped PUBLISH packets">> })}, {'recv_msg.dropped.await_pubrel_timeout', hoconsc:mk(integer(), #{ desc => <<"Number of dropped PUBLISH packets due to expired">> })}, {'recv_msg.qos0', hoconsc:mk(integer(), #{ desc => <<"Number of PUBLISH QoS0 packets received">> })}, {'recv_msg.qos1', hoconsc:mk(integer(), #{ desc => <<"Number of PUBLISH QoS1 packets received">> })}, {'recv_msg.qos2', hoconsc:mk(integer(), #{ desc => <<"Number of PUBLISH QoS2 packets received">> })}, {recv_oct, hoconsc:mk(integer(), #{desc => <<"Number of bytes received">>})}, {recv_pkt, hoconsc:mk(integer(), #{desc => <<"Number of MQTT packets received">>})}, {reductions, hoconsc:mk(integer(), #{desc => <<"Erlang reduction">>})}, {send_cnt, hoconsc:mk(integer(), #{desc => <<"Number of TCP packets sent">>})}, {send_msg, hoconsc:mk(integer(), #{desc => <<"Number of PUBLISH packets sent">>})}, {'send_msg.dropped', hoconsc:mk(integer(), #{ desc => <<"Number of dropped PUBLISH packets">> })}, {'send_msg.dropped.expired', hoconsc:mk(integer(), #{ desc => <<"Number of dropped PUBLISH packets due to expired">> })}, {'send_msg.dropped.queue_full', hoconsc:mk(integer(), #{ desc => <<"Number of dropped PUBLISH packets due to queue full">> })}, {'send_msg.dropped.too_large', hoconsc:mk(integer(), #{ desc => <<"Number of dropped PUBLISH packets due to packet length too large">> })}, {'send_msg.qos0', hoconsc:mk(integer(), #{ desc => <<"Number of PUBLISH QoS0 packets sent">> })}, {'send_msg.qos1', hoconsc:mk(integer(), #{ desc => <<"Number of PUBLISH QoS1 packets sent">> })}, {'send_msg.qos2', hoconsc:mk(integer(), #{ desc => <<"Number of PUBLISH QoS2 packets sent">> })}, {send_oct, hoconsc:mk(integer(), #{desc => <<"Number of bytes sent">>})}, {send_pkt, hoconsc:mk(integer(), #{desc => <<"Number of MQTT packets sent">>})}, {subscriptions_cnt, hoconsc:mk(integer(), #{ desc => <<"Number of subscriptions established by this client.">> })}, {subscriptions_max, hoconsc:mk(integer(), #{ desc => <<"v4 api name [max_subscriptions]", " Maximum number of subscriptions allowed by this client">> })}, {username, hoconsc:mk(binary(), #{desc => <<"User name of client when connecting">>})}, {mountpoint, hoconsc:mk(binary(), #{desc => <<"Topic mountpoint">>})} ]; fields(authz_cache) -> [ {access, hoconsc:mk(binary(), #{desc => <<"Access type">>, example => <<"publish">>})}, {result, hoconsc:mk(hoconsc:enum([allow, denny]), #{ desc => <<"Allow or deny">>, example => <<"allow">> })}, {topic, hoconsc:mk(binary(), #{desc => <<"Topic name">>, example => <<"testtopic/1">>})}, {updated_time, hoconsc:mk(integer(), #{desc => <<"Update time">>, example => 1687850712989})} ]; fields(keepalive) -> [ {interval, hoconsc:mk(range(0, 65535), #{desc => <<"Keepalive time, with the unit of second">>})} ]; fields(subscribe) -> [ {topic, hoconsc:mk(binary(), #{ required => true, desc => <<"Topic">>, example => <<"testtopic/#">> })}, {qos, hoconsc:mk(emqx_schema:qos(), #{default => 0, desc => <<"QoS">>})}, {nl, hoconsc:mk(integer(), #{default => 0, desc => <<"No Local">>})}, {rap, hoconsc:mk(integer(), #{default => 0, desc => <<"Retain as Published">>})}, {rh, hoconsc:mk(integer(), #{default => 0, desc => <<"Retain Handling">>})} ]; fields(unsubscribe) -> [ {topic, hoconsc:mk(binary(), #{desc => <<"Topic">>, example => <<"testtopic/#">>})} ]. %%%============================================================================================== %% parameters trans clients(get, #{query_string := QString}) -> list_clients(QString). kickout_clients(post, #{body := ClientIDs}) -> case emqx_mgmt:kickout_clients(ClientIDs) of ok -> {204}; {error, Reason} -> Message = list_to_binary(io_lib:format("~p", [Reason])), {500, #{code => <<"UNKNOWN_ERROR">>, message => Message}} end. client(get, #{bindings := Bindings}) -> lookup(Bindings); client(delete, #{bindings := Bindings}) -> kickout(Bindings). authz_cache(get, #{bindings := Bindings}) -> get_authz_cache(Bindings); authz_cache(delete, #{bindings := Bindings}) -> clean_authz_cache(Bindings). subscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) -> Opts = to_topic_info(TopicInfo), subscribe(Opts#{clientid => ClientID}). subscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) -> Topics = [ to_topic_info(TopicInfo) || TopicInfo <- TopicInfos ], subscribe_batch(#{clientid => ClientID, topics => Topics}). unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) -> Topic = maps:get(<<"topic">>, TopicInfo), unsubscribe(#{clientid => ClientID, topic => Topic}). unsubscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) -> Topics = [Topic || #{<<"topic">> := Topic} <- TopicInfos], unsubscribe_batch(#{clientid => ClientID, topics => Topics}). subscriptions(get, #{bindings := #{clientid := ClientID}}) -> case emqx_mgmt:list_client_subscriptions(ClientID) of {error, not_found} -> {404, ?CLIENTID_NOT_FOUND}; [] -> {200, []}; {Node, Subs} -> Formatter = fun(_Sub = {Topic, SubOpts}) -> emqx_mgmt_api_subscriptions:format(Node, {{Topic, ClientID}, SubOpts}) end, {200, lists:map(Formatter, Subs)} end. set_keepalive(put, #{bindings := #{clientid := ClientID}, body := Body}) -> case maps:find(<<"interval">>, Body) of error -> {400, 'BAD_REQUEST', "Interval Not Found"}; {ok, Interval} -> case emqx_mgmt:set_keepalive(ClientID, Interval) of ok -> lookup(#{clientid => ClientID}); {error, not_found} -> {404, ?CLIENTID_NOT_FOUND}; {error, Reason} -> {400, #{code => 'PARAM_ERROR', message => Reason}} end end. %%%============================================================================================== %% api apply list_clients(QString) -> Result = case maps:get(<<"node">>, QString, undefined) of undefined -> Options = #{fast_total_counting => true}, emqx_mgmt_api:cluster_query( ?CHAN_INFO_TAB, QString, ?CLIENT_QSCHEMA, fun ?MODULE:qs2ms/2, fun ?MODULE:format_channel_info/2, Options ); Node0 -> case emqx_utils:safe_to_existing_atom(Node0) of {ok, Node1} -> QStringWithoutNode = maps:without([<<"node">>], QString), emqx_mgmt_api:node_query( Node1, ?CHAN_INFO_TAB, QStringWithoutNode, ?CLIENT_QSCHEMA, fun ?MODULE:qs2ms/2, fun ?MODULE:format_channel_info/2 ); {error, _} -> {error, Node0, {badrpc, <<"invalid node">>}} end end, case Result of {error, page_limit_invalid} -> {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; {error, invalid_query_string_param, {Key, ExpectedType, AcutalValue}} -> Message = list_to_binary( io_lib:format( "the ~s parameter expected type is ~s, but the value is ~s", [Key, ExpectedType, emqx_utils_conv:str(AcutalValue)] ) ), {400, #{code => <<"INVALID_PARAMETER">>, message => Message}}; {error, Node, {badrpc, R}} -> Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])), {500, #{code => <<"NODE_DOWN">>, message => Message}}; Response -> {200, Response} end. lookup(#{clientid := ClientID}) -> case emqx_mgmt:lookup_client({clientid, ClientID}, ?FORMAT_FUN) of [] -> {404, ?CLIENTID_NOT_FOUND}; ClientInfo -> {200, hd(ClientInfo)} end. kickout(#{clientid := ClientID}) -> case emqx_mgmt:kickout_client(ClientID) of {error, not_found} -> {404, ?CLIENTID_NOT_FOUND}; _ -> {204} end. get_authz_cache(#{clientid := ClientID}) -> case emqx_mgmt:list_authz_cache(ClientID) of {error, not_found} -> {404, ?CLIENTID_NOT_FOUND}; {error, Reason} -> Message = list_to_binary(io_lib:format("~p", [Reason])), {500, #{code => <<"UNKNOWN_ERROR">>, message => Message}}; Caches -> Response = [format_authz_cache(Cache) || Cache <- Caches], {200, Response} end. clean_authz_cache(#{clientid := ClientID}) -> case emqx_mgmt:clean_authz_cache(ClientID) of ok -> {204}; {error, not_found} -> {404, ?CLIENTID_NOT_FOUND}; {error, Reason} -> Message = list_to_binary(io_lib:format("~p", [Reason])), {500, #{code => <<"UNKNOWN_ERROR">>, message => Message}} end. subscribe(#{clientid := ClientID, topic := Topic} = Sub) -> Opts = maps:with([qos, nl, rap, rh], Sub), case do_subscribe(ClientID, Topic, Opts) of {error, channel_not_found} -> {404, ?CLIENTID_NOT_FOUND}; {error, Reason} -> Message = list_to_binary(io_lib:format("~p", [Reason])), {500, #{code => <<"UNKNOWN_ERROR">>, message => Message}}; {ok, SubInfo} -> {200, SubInfo} end. subscribe_batch(#{clientid := ClientID, topics := Topics}) -> %% We use emqx_channel instead of emqx_channel_info (used by the emqx_mgmt:lookup_client/2), %% as the emqx_channel_info table will only be populated after the hook `client.connected` %% has returned. So if one want to subscribe topics in this hook, it will fail. case ets:lookup(?CHAN_TAB, ClientID) of [] -> {404, ?CLIENTID_NOT_FOUND}; _ -> ArgList = [ [ClientID, Topic, maps:with([qos, nl, rap, rh], Sub)] || #{topic := Topic} = Sub <- Topics ], {200, emqx_mgmt_util:batch_operation(?MODULE, do_subscribe, ArgList)} end. unsubscribe(#{clientid := ClientID, topic := Topic}) -> case do_unsubscribe(ClientID, Topic) of {error, channel_not_found} -> {404, ?CLIENTID_NOT_FOUND}; {unsubscribe, [{Topic, #{}}]} -> {204} end. unsubscribe_batch(#{clientid := ClientID, topics := Topics}) -> case lookup(#{clientid => ClientID}) of {200, _} -> _ = emqx_mgmt:unsubscribe_batch(ClientID, Topics), {204}; {404, NotFound} -> {404, NotFound} end. %%-------------------------------------------------------------------- %% internal function do_subscribe(ClientID, Topic0, Options) -> {Topic, Opts} = emqx_topic:parse(Topic0, Options), TopicTable = [{Topic, Opts}], case emqx_mgmt:subscribe(ClientID, TopicTable) of {error, Reason} -> {error, Reason}; {subscribe, Subscriptions, Node} -> case proplists:is_defined(Topic, Subscriptions) of true -> {ok, Options#{node => Node, clientid => ClientID, topic => Topic}}; false -> {error, unknow_error} end end. -spec do_unsubscribe(emqx_types:clientid(), emqx_types:topic()) -> {unsubscribe, _} | {error, channel_not_found}. do_unsubscribe(ClientID, Topic) -> case emqx_mgmt:unsubscribe(ClientID, Topic) of {error, Reason} -> {error, Reason}; Res -> Res end. %%-------------------------------------------------------------------- %% QueryString to Match Spec -spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). qs2ms(_Tab, {QString, FuzzyQString}) -> #{ match_spec => qs2ms(QString), fuzzy_fun => fuzzy_filter_fun(FuzzyQString) }. -spec qs2ms(list()) -> ets:match_spec(). qs2ms(Qs) -> {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}), [{{'$1', MtchHead, '_'}, Conds, ['$_']}]. qs2ms([], _, {MtchHead, Conds}) -> {MtchHead, lists:reverse(Conds)}; qs2ms([{Key, '=:=', Value} | Rest], N, {MtchHead, Conds}) -> NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Value)), qs2ms(Rest, N, {NMtchHead, Conds}); qs2ms([Qs | Rest], N, {MtchHead, Conds}) -> Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8), NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)), NConds = put_conds(Qs, Holder, Conds), qs2ms(Rest, N + 1, {NMtchHead, NConds}). put_conds({_, Op, V}, Holder, Conds) -> [{Op, Holder, V} | Conds]; put_conds({_, Op1, V1, Op2, V2}, Holder, Conds) -> [ {Op2, Holder, V2}, {Op1, Holder, V1} | Conds ]. ms(clientid, X) -> #{clientinfo => #{clientid => X}}; ms(username, X) -> #{clientinfo => #{username => X}}; ms(conn_state, X) -> #{conn_state => X}; ms(ip_address, X) -> #{conninfo => #{peername => {X, '_'}}}; ms(clean_start, X) -> #{conninfo => #{clean_start => X}}; ms(proto_ver, X) -> #{conninfo => #{proto_ver => X}}; ms(connected_at, X) -> #{conninfo => #{connected_at => X}}; ms(created_at, X) -> #{session => #{created_at => X}}. %%-------------------------------------------------------------------- %% Match funcs fuzzy_filter_fun([]) -> undefined; fuzzy_filter_fun(Fuzzy) -> {fun ?MODULE:run_fuzzy_filter/2, [Fuzzy]}. run_fuzzy_filter(_, []) -> true; run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} | Fuzzy]) -> Val = case maps:get(Key, ClientInfo, <<>>) of undefined -> <<>>; V -> V end, binary:match(Val, SubStr) /= nomatch andalso run_fuzzy_filter(E, Fuzzy). %%-------------------------------------------------------------------- %% format funcs format_channel_info(ChannInfo = {_, _ClientInfo, _ClientStats}) -> format_channel_info(node(), ChannInfo). format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) -> Node = maps:get(node, ClientInfo0, WhichNode), ClientInfo1 = emqx_utils_maps:deep_remove([conninfo, clientid], ClientInfo0), ClientInfo2 = emqx_utils_maps:deep_remove([conninfo, username], ClientInfo1), StatsMap = maps:without( [memory, next_pkt_id, total_heap_size], maps:from_list(ClientStats) ), ClientInfo3 = maps:remove(will_msg, ClientInfo2), ClientInfoMap0 = maps:fold(fun take_maps_from_inner/3, #{}, ClientInfo3), {IpAddress, Port} = peername_dispart(maps:get(peername, ClientInfoMap0)), Connected = maps:get(conn_state, ClientInfoMap0) =:= connected, ClientInfoMap1 = maps:merge(StatsMap, ClientInfoMap0), ClientInfoMap2 = maps:put(node, Node, ClientInfoMap1), ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2), ClientInfoMap4 = maps:put(port, Port, ClientInfoMap3), ClientInfoMap5 = convert_expiry_interval_unit(ClientInfoMap4), ClientInfoMap = maps:put(connected, Connected, ClientInfoMap5), RemoveList = [ auth_result, peername, sockname, peerhost, conn_state, send_pend, conn_props, peercert, sockstate, subscriptions, receive_maximum, protocol, is_superuser, sockport, anonymous, socktype, active_n, await_rel_timeout, conn_mod, sockname, retry_interval, upgrade_qos, zone, %% session_id, defined in emqx_session.erl id, acl ], TimesKeys = [created_at, connected_at, disconnected_at], %% format timestamp to rfc3339 result_format_undefined_to_null( lists:foldl( fun result_format_time_fun/2, maps:without(RemoveList, ClientInfoMap), TimesKeys ) ). %% format func helpers take_maps_from_inner(_Key, Value, Current) when is_map(Value) -> maps:merge(Current, Value); take_maps_from_inner(Key, Value, Current) -> maps:put(Key, Value, Current). result_format_time_fun(Key, NClientInfoMap) -> case NClientInfoMap of #{Key := TimeStamp} -> NClientInfoMap#{ Key => emqx_utils_calendar:epoch_to_rfc3339(TimeStamp) }; #{} -> NClientInfoMap end. result_format_undefined_to_null(Map) -> maps:map( fun (_, undefined) -> null; (_, V) -> V end, Map ). -spec peername_dispart(emqx_types:peername()) -> {binary(), inet:port_number()}. peername_dispart({Addr, Port}) -> AddrBinary = list_to_binary(inet:ntoa(Addr)), %% PortBinary = integer_to_binary(Port), {AddrBinary, Port}. convert_expiry_interval_unit(ClientInfoMap = #{expiry_interval := Interval}) -> ClientInfoMap#{expiry_interval := Interval div 1000}. format_authz_cache({{PubSub, Topic}, {AuthzResult, Timestamp}}) -> #{ access => PubSub, topic => Topic, result => AuthzResult, updated_time => Timestamp }. to_topic_info(Data) -> M = maps:with([<<"topic">>, <<"qos">>, <<"nl">>, <<"rap">>, <<"rh">>], Data), emqx_utils_maps:safe_atom_key_map(M). client_example() -> #{ <<"recv_oct">> => 49, <<"expiry_interval">> => 0, <<"created_at">> => <<"2024-01-01T12:34:56.789+08:00">>, <<"awaiting_rel_max">> => 100, <<"send_msg">> => 0, <<"enable_authn">> => true, <<"send_msg.qos2">> => 0, <<"peerport">> => 52571, <<"connected_at">> => <<"2024-01-01T12:34:56.789+08:00">>, <<"send_msg.dropped.too_large">> => 0, <<"inflight_cnt">> => 0, <<"keepalive">> => 60, <<"node">> => <<"emqx@127.0.0.1">>, <<"send_cnt">> => 4, <<"recv_msg.dropped.await_pubrel_timeout">> => 0, <<"recv_msg.dropped">> => 0, <<"inflight_max">> => 32, <<"proto_name">> => <<"MQTT">>, <<"send_msg.dropped.expired">> => 0, <<"awaiting_rel_cnt">> => 0, <<"mqueue_max">> => 1000, <<"send_oct">> => 31, <<"send_msg.dropped.queue_full">> => 0, <<"mqueue_len">> => 0, <<"heap_size">> => 610, <<"is_persistent">> => false, <<"send_msg.qos0">> => 0, <<"clean_start">> => true, <<"mountpoint">> => <<"null">>, <<"proto_ver">> => 5, <<"ip_address">> => <<"127.0.0.1">>, <<"mqueue_dropped">> => 0, <<"port">> => 52571, <<"listener">> => <<"tcp:default">>, <<"recv_msg.qos2">> => 0, <<"recv_msg.qos1">> => 0, <<"is_bridge">> => false, <<"subscriptions_cnt">> => 1, <<"username">> => null, <<"send_msg.dropped">> => 0, <<"send_pkt">> => 4, <<"subscriptions_max">> => <<"infinity">>, <<"send_msg.qos1">> => 0, <<"connected">> => true, <<"reductions">> => 6836, <<"mailbox_len">> => 0, <<"clientid">> => "01", <<"recv_msg">> => 0, <<"recv_pkt">> => 4, <<"recv_cnt">> => 4, <<"recv_msg.qos0">> => 0 }.