1062 lines
38 KiB
Erlang
1062 lines
38 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_mgmt_api_clients).
|
|
|
|
-behaviour(minirest_api).
|
|
|
|
-include_lib("typerefl/include/types.hrl").
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/emqx_cm.hrl").
|
|
-include_lib("hocon/include/hoconsc.hrl").
|
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
-include("emqx_mgmt.hrl").
|
|
|
|
%% API
|
|
-export([
|
|
api_spec/0,
|
|
paths/0,
|
|
schema/1,
|
|
fields/1
|
|
]).
|
|
|
|
-export([
|
|
clients/2,
|
|
kickout_clients/2,
|
|
client/2,
|
|
subscriptions/2,
|
|
authz_cache/2,
|
|
subscribe/2,
|
|
subscribe_batch/2,
|
|
unsubscribe/2,
|
|
unsubscribe_batch/2,
|
|
set_keepalive/2
|
|
]).
|
|
|
|
-export([
|
|
qs2ms/2,
|
|
run_fuzzy_filter/2,
|
|
format_channel_info/1,
|
|
format_channel_info/2
|
|
]).
|
|
|
|
%% for batch operation
|
|
-export([do_subscribe/3]).
|
|
|
|
-define(TAGS, [<<"Clients">>]).
|
|
|
|
-define(CLIENT_QSCHEMA, [
|
|
{<<"node">>, atom},
|
|
{<<"username">>, binary},
|
|
{<<"ip_address">>, ip},
|
|
{<<"conn_state">>, atom},
|
|
{<<"clean_start">>, atom},
|
|
{<<"proto_ver">>, integer},
|
|
{<<"like_clientid">>, binary},
|
|
{<<"like_username">>, binary},
|
|
{<<"gte_created_at">>, timestamp},
|
|
{<<"lte_created_at">>, timestamp},
|
|
{<<"gte_connected_at">>, timestamp},
|
|
{<<"lte_connected_at">>, timestamp}
|
|
]).
|
|
|
|
-define(FORMAT_FUN, {?MODULE, format_channel_info}).
|
|
|
|
-define(CLIENTID_NOT_FOUND, #{
|
|
code => 'CLIENTID_NOT_FOUND',
|
|
message => <<"Client ID not found">>
|
|
}).
|
|
|
|
api_spec() ->
|
|
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}).
|
|
|
|
paths() ->
|
|
[
|
|
"/clients",
|
|
"/clients/kickout/bulk",
|
|
"/clients/:clientid",
|
|
"/clients/:clientid/authorization/cache",
|
|
"/clients/:clientid/subscriptions",
|
|
"/clients/:clientid/subscribe",
|
|
"/clients/:clientid/subscribe/bulk",
|
|
"/clients/:clientid/unsubscribe",
|
|
"/clients/:clientid/unsubscribe/bulk",
|
|
"/clients/:clientid/keepalive"
|
|
].
|
|
|
|
schema("/clients") ->
|
|
#{
|
|
'operationId' => clients,
|
|
get => #{
|
|
description => ?DESC(list_clients),
|
|
tags => ?TAGS,
|
|
parameters => [
|
|
hoconsc:ref(emqx_dashboard_swagger, page),
|
|
hoconsc:ref(emqx_dashboard_swagger, limit),
|
|
{node,
|
|
hoconsc:mk(binary(), #{
|
|
in => query,
|
|
required => false,
|
|
desc => <<"Node name">>,
|
|
example => <<"emqx@127.0.0.1">>
|
|
})},
|
|
{username,
|
|
hoconsc:mk(binary(), #{
|
|
in => query,
|
|
required => false,
|
|
desc => <<"User name">>
|
|
})},
|
|
{ip_address,
|
|
hoconsc:mk(binary(), #{
|
|
in => query,
|
|
required => false,
|
|
desc => <<"Client's IP address">>,
|
|
example => <<"127.0.0.1">>
|
|
})},
|
|
{conn_state,
|
|
hoconsc:mk(hoconsc:enum([connected, idle, disconnected]), #{
|
|
in => query,
|
|
required => false,
|
|
desc =>
|
|
<<"The current connection status of the client, ",
|
|
"the possible values are connected,idle,disconnected">>
|
|
})},
|
|
{clean_start,
|
|
hoconsc:mk(boolean(), #{
|
|
in => query,
|
|
required => false,
|
|
description => <<"Whether the client uses a new session">>
|
|
})},
|
|
{proto_ver,
|
|
hoconsc:mk(binary(), #{
|
|
in => query,
|
|
required => false,
|
|
desc => <<"Client protocol version">>
|
|
})},
|
|
{like_clientid,
|
|
hoconsc:mk(binary(), #{
|
|
in => query,
|
|
required => false,
|
|
desc => <<"Fuzzy search `clientid` as substring">>
|
|
})},
|
|
{like_username,
|
|
hoconsc:mk(binary(), #{
|
|
in => query,
|
|
required => false,
|
|
desc => <<"Fuzzy search `username` as substring">>
|
|
})},
|
|
{gte_created_at,
|
|
hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{
|
|
in => query,
|
|
required => false,
|
|
desc =>
|
|
<<"Search client session creation time by greater",
|
|
" than or equal method, rfc3339 or timestamp(millisecond)">>
|
|
})},
|
|
{lte_created_at,
|
|
hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{
|
|
in => query,
|
|
required => false,
|
|
desc =>
|
|
<<"Search client session creation time by less",
|
|
" than or equal method, rfc3339 or timestamp(millisecond)">>
|
|
})},
|
|
{gte_connected_at,
|
|
hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{
|
|
in => query,
|
|
required => false,
|
|
desc => <<
|
|
"Search client connection creation time by greater"
|
|
" than or equal method, rfc3339 or timestamp(epoch millisecond)"
|
|
>>
|
|
})},
|
|
{lte_connected_at,
|
|
hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{
|
|
in => query,
|
|
required => false,
|
|
desc => <<
|
|
"Search client connection creation time by less"
|
|
" than or equal method, rfc3339 or timestamp(millisecond)"
|
|
>>
|
|
})}
|
|
],
|
|
responses => #{
|
|
200 =>
|
|
emqx_dashboard_swagger:schema_with_example(?R_REF(clients), #{
|
|
<<"data">> => [client_example()],
|
|
<<"meta">> => #{
|
|
<<"count">> => 1,
|
|
<<"limit">> => 50,
|
|
<<"page">> => 1,
|
|
<<"hasnext">> => false
|
|
}
|
|
}),
|
|
400 =>
|
|
emqx_dashboard_swagger:error_codes(
|
|
['INVALID_PARAMETER'], <<"Invalid parameters">>
|
|
)
|
|
}
|
|
}
|
|
};
|
|
schema("/clients/kickout/bulk") ->
|
|
#{
|
|
'operationId' => kickout_clients,
|
|
post => #{
|
|
description => ?DESC(kickout_clients),
|
|
tags => ?TAGS,
|
|
'requestBody' => emqx_dashboard_swagger:schema_with_example(
|
|
hoconsc:array(binary()),
|
|
["emqx_clientid_985bb09d", "emqx_clientid_211cc01c"]
|
|
),
|
|
responses => #{
|
|
204 => <<"Kick out clients successfully">>
|
|
}
|
|
}
|
|
};
|
|
schema("/clients/:clientid") ->
|
|
#{
|
|
'operationId' => client,
|
|
get => #{
|
|
description => ?DESC(clients_info_from_id),
|
|
tags => ?TAGS,
|
|
parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
|
|
responses => #{
|
|
200 => emqx_dashboard_swagger:schema_with_example(
|
|
?R_REF(client),
|
|
client_example()
|
|
),
|
|
404 => emqx_dashboard_swagger:error_codes(
|
|
['CLIENTID_NOT_FOUND'], <<"Client ID not found">>
|
|
)
|
|
}
|
|
},
|
|
delete => #{
|
|
description => ?DESC(kick_client_id),
|
|
tags => ?TAGS,
|
|
parameters => [
|
|
{clientid, hoconsc:mk(binary(), #{in => path})}
|
|
],
|
|
responses => #{
|
|
204 => <<"Kick out client successfully">>,
|
|
404 => emqx_dashboard_swagger:error_codes(
|
|
['CLIENTID_NOT_FOUND'], <<"Client ID not found">>
|
|
)
|
|
}
|
|
}
|
|
};
|
|
schema("/clients/:clientid/authorization/cache") ->
|
|
#{
|
|
'operationId' => authz_cache,
|
|
get => #{
|
|
description => ?DESC(get_authz_cache),
|
|
tags => ?TAGS,
|
|
parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
|
|
responses => #{
|
|
200 => hoconsc:mk(hoconsc:ref(?MODULE, authz_cache), #{}),
|
|
404 => emqx_dashboard_swagger:error_codes(
|
|
['CLIENTID_NOT_FOUND'], <<"Client ID not found">>
|
|
)
|
|
}
|
|
},
|
|
delete => #{
|
|
description => ?DESC(clean_authz_cache),
|
|
tags => ?TAGS,
|
|
parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
|
|
responses => #{
|
|
204 => <<"Clean client authz cache successfully">>,
|
|
404 => emqx_dashboard_swagger:error_codes(
|
|
['CLIENTID_NOT_FOUND'], <<"Client ID not found">>
|
|
)
|
|
}
|
|
}
|
|
};
|
|
schema("/clients/:clientid/subscriptions") ->
|
|
#{
|
|
'operationId' => subscriptions,
|
|
get => #{
|
|
description => ?DESC(get_client_subs),
|
|
tags => ?TAGS,
|
|
parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
|
|
responses => #{
|
|
200 => hoconsc:mk(
|
|
hoconsc:array(hoconsc:ref(emqx_mgmt_api_subscriptions, subscription)), #{}
|
|
),
|
|
404 => emqx_dashboard_swagger:error_codes(
|
|
['CLIENTID_NOT_FOUND'], <<"Client ID not found">>
|
|
)
|
|
}
|
|
}
|
|
};
|
|
schema("/clients/:clientid/subscribe") ->
|
|
#{
|
|
'operationId' => subscribe,
|
|
post => #{
|
|
description => ?DESC(subscribe),
|
|
tags => ?TAGS,
|
|
parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
|
|
'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, subscribe)),
|
|
responses => #{
|
|
200 => hoconsc:ref(emqx_mgmt_api_subscriptions, subscription),
|
|
404 => emqx_dashboard_swagger:error_codes(
|
|
['CLIENTID_NOT_FOUND'], <<"Client ID not found">>
|
|
)
|
|
}
|
|
}
|
|
};
|
|
schema("/clients/:clientid/subscribe/bulk") ->
|
|
#{
|
|
'operationId' => subscribe_batch,
|
|
post => #{
|
|
description => ?DESC(subscribe_g),
|
|
tags => ?TAGS,
|
|
parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
|
|
'requestBody' => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, subscribe))),
|
|
responses => #{
|
|
200 => hoconsc:array(hoconsc:ref(emqx_mgmt_api_subscriptions, subscription)),
|
|
404 => emqx_dashboard_swagger:error_codes(
|
|
['CLIENTID_NOT_FOUND'], <<"Client ID not found">>
|
|
)
|
|
}
|
|
}
|
|
};
|
|
schema("/clients/:clientid/unsubscribe") ->
|
|
#{
|
|
'operationId' => unsubscribe,
|
|
post => #{
|
|
description => ?DESC(unsubscribe),
|
|
tags => ?TAGS,
|
|
parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
|
|
'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, unsubscribe)),
|
|
responses => #{
|
|
204 => <<"Unsubscribe OK">>,
|
|
404 => emqx_dashboard_swagger:error_codes(
|
|
['CLIENTID_NOT_FOUND'], <<"Client ID not found">>
|
|
)
|
|
}
|
|
}
|
|
};
|
|
schema("/clients/:clientid/unsubscribe/bulk") ->
|
|
#{
|
|
'operationId' => unsubscribe_batch,
|
|
post => #{
|
|
description => ?DESC(unsubscribe_g),
|
|
tags => ?TAGS,
|
|
parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
|
|
'requestBody' => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, unsubscribe))),
|
|
responses => #{
|
|
204 => <<"Unsubscribe OK">>,
|
|
404 => emqx_dashboard_swagger:error_codes(
|
|
['CLIENTID_NOT_FOUND'], <<"Client ID not found">>
|
|
)
|
|
}
|
|
}
|
|
};
|
|
schema("/clients/:clientid/keepalive") ->
|
|
#{
|
|
'operationId' => set_keepalive,
|
|
put => #{
|
|
description => ?DESC(set_keepalive_seconds),
|
|
tags => ?TAGS,
|
|
hidden => true,
|
|
parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
|
|
'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, keepalive)),
|
|
responses => #{
|
|
200 => emqx_dashboard_swagger:schema_with_example(
|
|
?R_REF(client),
|
|
client_example()
|
|
),
|
|
404 => emqx_dashboard_swagger:error_codes(
|
|
['CLIENTID_NOT_FOUND'], <<"Client ID not found">>
|
|
)
|
|
}
|
|
}
|
|
}.
|
|
|
|
fields(clients) ->
|
|
[
|
|
{data, hoconsc:mk(hoconsc:array(?REF(client)), #{})},
|
|
{meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, meta), #{})}
|
|
];
|
|
fields(client) ->
|
|
[
|
|
{awaiting_rel_cnt,
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"v4 api name [awaiting_rel] Number of awaiting PUBREC packet">>
|
|
})},
|
|
{awaiting_rel_max,
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<
|
|
"v4 api name [max_awaiting_rel]. "
|
|
"Maximum allowed number of awaiting PUBREC packet"
|
|
>>
|
|
})},
|
|
{clean_start,
|
|
hoconsc:mk(boolean(), #{
|
|
desc =>
|
|
<<"Indicate whether the client is using a brand new session">>
|
|
})},
|
|
{clientid, hoconsc:mk(binary(), #{desc => <<"Client identifier">>})},
|
|
{connected, hoconsc:mk(boolean(), #{desc => <<"Whether the client is connected">>})},
|
|
{connected_at,
|
|
hoconsc:mk(
|
|
emqx_utils_calendar:epoch_millisecond(),
|
|
#{desc => <<"Client connection time, rfc3339 or timestamp(millisecond)">>}
|
|
)},
|
|
{created_at,
|
|
hoconsc:mk(
|
|
emqx_utils_calendar:epoch_millisecond(),
|
|
#{desc => <<"Session creation time, rfc3339 or timestamp(millisecond)">>}
|
|
)},
|
|
{disconnected_at,
|
|
hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{
|
|
desc =>
|
|
<<
|
|
"Client offline time."
|
|
" It's Only valid and returned when connected is false, rfc3339 or timestamp(millisecond)"
|
|
>>
|
|
})},
|
|
{expiry_interval,
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"Session expiration interval, with the unit of second">>
|
|
})},
|
|
{heap_size,
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"Process heap size with the unit of byte">>
|
|
})},
|
|
{inflight_cnt, hoconsc:mk(integer(), #{desc => <<"Current length of inflight">>})},
|
|
{inflight_max,
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"v4 api name [max_inflight]. Maximum length of inflight">>
|
|
})},
|
|
{ip_address, hoconsc:mk(binary(), #{desc => <<"Client's IP address">>})},
|
|
{is_bridge,
|
|
hoconsc:mk(boolean(), #{
|
|
desc =>
|
|
<<"Indicates whether the client is connected via bridge">>
|
|
})},
|
|
{keepalive,
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"keepalive time, with the unit of second">>
|
|
})},
|
|
{mailbox_len, hoconsc:mk(integer(), #{desc => <<"Process mailbox size">>})},
|
|
{mqueue_dropped,
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"Number of messages dropped by the message queue due to exceeding the length">>
|
|
})},
|
|
{mqueue_len, hoconsc:mk(integer(), #{desc => <<"Current length of message queue">>})},
|
|
{mqueue_max,
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"v4 api name [max_mqueue]. Maximum length of message queue">>
|
|
})},
|
|
{node,
|
|
hoconsc:mk(binary(), #{
|
|
desc =>
|
|
<<"Name of the node to which the client is connected">>
|
|
})},
|
|
{port, hoconsc:mk(integer(), #{desc => <<"Client's port">>})},
|
|
{proto_name, hoconsc:mk(binary(), #{desc => <<"Client protocol name">>})},
|
|
{proto_ver, hoconsc:mk(integer(), #{desc => <<"Protocol version used by the client">>})},
|
|
{recv_cnt, hoconsc:mk(integer(), #{desc => <<"Number of TCP packets received">>})},
|
|
{recv_msg, hoconsc:mk(integer(), #{desc => <<"Number of PUBLISH packets received">>})},
|
|
{'recv_msg.dropped',
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"Number of dropped PUBLISH packets">>
|
|
})},
|
|
{'recv_msg.dropped.await_pubrel_timeout',
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"Number of dropped PUBLISH packets due to expired">>
|
|
})},
|
|
{'recv_msg.qos0',
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"Number of PUBLISH QoS0 packets received">>
|
|
})},
|
|
{'recv_msg.qos1',
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"Number of PUBLISH QoS1 packets received">>
|
|
})},
|
|
{'recv_msg.qos2',
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"Number of PUBLISH QoS2 packets received">>
|
|
})},
|
|
{recv_oct, hoconsc:mk(integer(), #{desc => <<"Number of bytes received">>})},
|
|
{recv_pkt, hoconsc:mk(integer(), #{desc => <<"Number of MQTT packets received">>})},
|
|
{reductions, hoconsc:mk(integer(), #{desc => <<"Erlang reduction">>})},
|
|
{send_cnt, hoconsc:mk(integer(), #{desc => <<"Number of TCP packets sent">>})},
|
|
{send_msg, hoconsc:mk(integer(), #{desc => <<"Number of PUBLISH packets sent">>})},
|
|
{'send_msg.dropped',
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"Number of dropped PUBLISH packets">>
|
|
})},
|
|
{'send_msg.dropped.expired',
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"Number of dropped PUBLISH packets due to expired">>
|
|
})},
|
|
{'send_msg.dropped.queue_full',
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"Number of dropped PUBLISH packets due to queue full">>
|
|
})},
|
|
{'send_msg.dropped.too_large',
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"Number of dropped PUBLISH packets due to packet length too large">>
|
|
})},
|
|
{'send_msg.qos0',
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"Number of PUBLISH QoS0 packets sent">>
|
|
})},
|
|
{'send_msg.qos1',
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"Number of PUBLISH QoS1 packets sent">>
|
|
})},
|
|
{'send_msg.qos2',
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"Number of PUBLISH QoS2 packets sent">>
|
|
})},
|
|
{send_oct, hoconsc:mk(integer(), #{desc => <<"Number of bytes sent">>})},
|
|
{send_pkt, hoconsc:mk(integer(), #{desc => <<"Number of MQTT packets sent">>})},
|
|
{subscriptions_cnt,
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"Number of subscriptions established by this client.">>
|
|
})},
|
|
{subscriptions_max,
|
|
hoconsc:mk(integer(), #{
|
|
desc =>
|
|
<<"v4 api name [max_subscriptions]",
|
|
" Maximum number of subscriptions allowed by this client">>
|
|
})},
|
|
{username, hoconsc:mk(binary(), #{desc => <<"User name of client when connecting">>})},
|
|
{mountpoint, hoconsc:mk(binary(), #{desc => <<"Topic mountpoint">>})}
|
|
];
|
|
fields(authz_cache) ->
|
|
[
|
|
{access, hoconsc:mk(binary(), #{desc => <<"Access type">>, example => <<"publish">>})},
|
|
{result,
|
|
hoconsc:mk(hoconsc:enum([allow, denny]), #{
|
|
desc => <<"Allow or deny">>, example => <<"allow">>
|
|
})},
|
|
{topic, hoconsc:mk(binary(), #{desc => <<"Topic name">>, example => <<"testtopic/1">>})},
|
|
{updated_time,
|
|
hoconsc:mk(integer(), #{desc => <<"Update time">>, example => 1687850712989})}
|
|
];
|
|
fields(keepalive) ->
|
|
[
|
|
{interval,
|
|
hoconsc:mk(range(0, 65535), #{desc => <<"Keepalive time, with the unit of second">>})}
|
|
];
|
|
fields(subscribe) ->
|
|
[
|
|
{topic,
|
|
hoconsc:mk(binary(), #{
|
|
required => true, desc => <<"Topic">>, example => <<"testtopic/#">>
|
|
})},
|
|
{qos, hoconsc:mk(emqx_schema:qos(), #{default => 0, desc => <<"QoS">>})},
|
|
{nl, hoconsc:mk(integer(), #{default => 0, desc => <<"No Local">>})},
|
|
{rap, hoconsc:mk(integer(), #{default => 0, desc => <<"Retain as Published">>})},
|
|
{rh, hoconsc:mk(integer(), #{default => 0, desc => <<"Retain Handling">>})}
|
|
];
|
|
fields(unsubscribe) ->
|
|
[
|
|
{topic, hoconsc:mk(binary(), #{desc => <<"Topic">>, example => <<"testtopic/#">>})}
|
|
].
|
|
|
|
%%%==============================================================================================
|
|
%% parameters trans
|
|
clients(get, #{query_string := QString}) ->
|
|
list_clients(QString).
|
|
|
|
kickout_clients(post, #{body := ClientIDs}) ->
|
|
case emqx_mgmt:kickout_clients(ClientIDs) of
|
|
ok ->
|
|
{204};
|
|
{error, Reason} ->
|
|
Message = list_to_binary(io_lib:format("~p", [Reason])),
|
|
{500, #{code => <<"UNKNOWN_ERROR">>, message => Message}}
|
|
end.
|
|
|
|
client(get, #{bindings := Bindings}) ->
|
|
lookup(Bindings);
|
|
client(delete, #{bindings := Bindings}) ->
|
|
kickout(Bindings).
|
|
|
|
authz_cache(get, #{bindings := Bindings}) ->
|
|
get_authz_cache(Bindings);
|
|
authz_cache(delete, #{bindings := Bindings}) ->
|
|
clean_authz_cache(Bindings).
|
|
|
|
subscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) ->
|
|
Opts = to_topic_info(TopicInfo),
|
|
subscribe(Opts#{clientid => ClientID}).
|
|
|
|
subscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) ->
|
|
Topics =
|
|
[
|
|
to_topic_info(TopicInfo)
|
|
|| TopicInfo <- TopicInfos
|
|
],
|
|
subscribe_batch(#{clientid => ClientID, topics => Topics}).
|
|
|
|
unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) ->
|
|
Topic = maps:get(<<"topic">>, TopicInfo),
|
|
unsubscribe(#{clientid => ClientID, topic => Topic}).
|
|
|
|
unsubscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) ->
|
|
Topics = [Topic || #{<<"topic">> := Topic} <- TopicInfos],
|
|
unsubscribe_batch(#{clientid => ClientID, topics => Topics}).
|
|
|
|
subscriptions(get, #{bindings := #{clientid := ClientID}}) ->
|
|
case emqx_mgmt:list_client_subscriptions(ClientID) of
|
|
{error, not_found} ->
|
|
{404, ?CLIENTID_NOT_FOUND};
|
|
[] ->
|
|
{200, []};
|
|
{Node, Subs} ->
|
|
Formatter =
|
|
fun(_Sub = {Topic, SubOpts}) ->
|
|
emqx_mgmt_api_subscriptions:format(Node, {{Topic, ClientID}, SubOpts})
|
|
end,
|
|
{200, lists:map(Formatter, Subs)}
|
|
end.
|
|
|
|
set_keepalive(put, #{bindings := #{clientid := ClientID}, body := Body}) ->
|
|
case maps:find(<<"interval">>, Body) of
|
|
error ->
|
|
{400, 'BAD_REQUEST', "Interval Not Found"};
|
|
{ok, Interval} ->
|
|
case emqx_mgmt:set_keepalive(ClientID, Interval) of
|
|
ok -> lookup(#{clientid => ClientID});
|
|
{error, not_found} -> {404, ?CLIENTID_NOT_FOUND};
|
|
{error, Reason} -> {400, #{code => 'PARAM_ERROR', message => Reason}}
|
|
end
|
|
end.
|
|
|
|
%%%==============================================================================================
|
|
%% api apply
|
|
|
|
list_clients(QString) ->
|
|
Result =
|
|
case maps:get(<<"node">>, QString, undefined) of
|
|
undefined ->
|
|
Options = #{fast_total_counting => true},
|
|
emqx_mgmt_api:cluster_query(
|
|
?CHAN_INFO_TAB,
|
|
QString,
|
|
?CLIENT_QSCHEMA,
|
|
fun ?MODULE:qs2ms/2,
|
|
fun ?MODULE:format_channel_info/2,
|
|
Options
|
|
);
|
|
Node0 ->
|
|
case emqx_utils:safe_to_existing_atom(Node0) of
|
|
{ok, Node1} ->
|
|
QStringWithoutNode = maps:without([<<"node">>], QString),
|
|
emqx_mgmt_api:node_query(
|
|
Node1,
|
|
?CHAN_INFO_TAB,
|
|
QStringWithoutNode,
|
|
?CLIENT_QSCHEMA,
|
|
fun ?MODULE:qs2ms/2,
|
|
fun ?MODULE:format_channel_info/2
|
|
);
|
|
{error, _} ->
|
|
{error, Node0, {badrpc, <<"invalid node">>}}
|
|
end
|
|
end,
|
|
case Result of
|
|
{error, page_limit_invalid} ->
|
|
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
|
{error, invalid_query_string_param, {Key, ExpectedType, AcutalValue}} ->
|
|
Message = list_to_binary(
|
|
io_lib:format(
|
|
"the ~s parameter expected type is ~s, but the value is ~s",
|
|
[Key, ExpectedType, emqx_utils_conv:str(AcutalValue)]
|
|
)
|
|
),
|
|
{400, #{code => <<"INVALID_PARAMETER">>, message => Message}};
|
|
{error, Node, {badrpc, R}} ->
|
|
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
|
|
{500, #{code => <<"NODE_DOWN">>, message => Message}};
|
|
Response ->
|
|
{200, Response}
|
|
end.
|
|
|
|
lookup(#{clientid := ClientID}) ->
|
|
case emqx_mgmt:lookup_client({clientid, ClientID}, ?FORMAT_FUN) of
|
|
[] ->
|
|
{404, ?CLIENTID_NOT_FOUND};
|
|
ClientInfo ->
|
|
{200, hd(ClientInfo)}
|
|
end.
|
|
|
|
kickout(#{clientid := ClientID}) ->
|
|
case emqx_mgmt:kickout_client(ClientID) of
|
|
{error, not_found} ->
|
|
{404, ?CLIENTID_NOT_FOUND};
|
|
_ ->
|
|
{204}
|
|
end.
|
|
|
|
get_authz_cache(#{clientid := ClientID}) ->
|
|
case emqx_mgmt:list_authz_cache(ClientID) of
|
|
{error, not_found} ->
|
|
{404, ?CLIENTID_NOT_FOUND};
|
|
{error, Reason} ->
|
|
Message = list_to_binary(io_lib:format("~p", [Reason])),
|
|
{500, #{code => <<"UNKNOWN_ERROR">>, message => Message}};
|
|
Caches ->
|
|
Response = [format_authz_cache(Cache) || Cache <- Caches],
|
|
{200, Response}
|
|
end.
|
|
|
|
clean_authz_cache(#{clientid := ClientID}) ->
|
|
case emqx_mgmt:clean_authz_cache(ClientID) of
|
|
ok ->
|
|
{204};
|
|
{error, not_found} ->
|
|
{404, ?CLIENTID_NOT_FOUND};
|
|
{error, Reason} ->
|
|
Message = list_to_binary(io_lib:format("~p", [Reason])),
|
|
{500, #{code => <<"UNKNOWN_ERROR">>, message => Message}}
|
|
end.
|
|
|
|
subscribe(#{clientid := ClientID, topic := Topic} = Sub) ->
|
|
Opts = maps:with([qos, nl, rap, rh], Sub),
|
|
case do_subscribe(ClientID, Topic, Opts) of
|
|
{error, channel_not_found} ->
|
|
{404, ?CLIENTID_NOT_FOUND};
|
|
{error, Reason} ->
|
|
Message = list_to_binary(io_lib:format("~p", [Reason])),
|
|
{500, #{code => <<"UNKNOWN_ERROR">>, message => Message}};
|
|
{ok, SubInfo} ->
|
|
{200, SubInfo}
|
|
end.
|
|
|
|
subscribe_batch(#{clientid := ClientID, topics := Topics}) ->
|
|
%% We use emqx_channel instead of emqx_channel_info (used by the emqx_mgmt:lookup_client/2),
|
|
%% as the emqx_channel_info table will only be populated after the hook `client.connected`
|
|
%% has returned. So if one want to subscribe topics in this hook, it will fail.
|
|
case ets:lookup(?CHAN_TAB, ClientID) of
|
|
[] ->
|
|
{404, ?CLIENTID_NOT_FOUND};
|
|
_ ->
|
|
ArgList = [
|
|
[ClientID, Topic, maps:with([qos, nl, rap, rh], Sub)]
|
|
|| #{topic := Topic} = Sub <- Topics
|
|
],
|
|
{200, emqx_mgmt_util:batch_operation(?MODULE, do_subscribe, ArgList)}
|
|
end.
|
|
|
|
unsubscribe(#{clientid := ClientID, topic := Topic}) ->
|
|
case do_unsubscribe(ClientID, Topic) of
|
|
{error, channel_not_found} ->
|
|
{404, ?CLIENTID_NOT_FOUND};
|
|
{unsubscribe, [{Topic, #{}}]} ->
|
|
{204}
|
|
end.
|
|
|
|
unsubscribe_batch(#{clientid := ClientID, topics := Topics}) ->
|
|
case lookup(#{clientid => ClientID}) of
|
|
{200, _} ->
|
|
_ = emqx_mgmt:unsubscribe_batch(ClientID, Topics),
|
|
{204};
|
|
{404, NotFound} ->
|
|
{404, NotFound}
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% internal function
|
|
|
|
do_subscribe(ClientID, Topic0, Options) ->
|
|
{Topic, Opts} = emqx_topic:parse(Topic0, Options),
|
|
TopicTable = [{Topic, Opts}],
|
|
case emqx_mgmt:subscribe(ClientID, TopicTable) of
|
|
{error, Reason} ->
|
|
{error, Reason};
|
|
{subscribe, Subscriptions, Node} ->
|
|
case proplists:is_defined(Topic, Subscriptions) of
|
|
true ->
|
|
{ok, Options#{node => Node, clientid => ClientID, topic => Topic}};
|
|
false ->
|
|
{error, unknow_error}
|
|
end
|
|
end.
|
|
|
|
-spec do_unsubscribe(emqx_types:clientid(), emqx_types:topic()) ->
|
|
{unsubscribe, _} | {error, channel_not_found}.
|
|
do_unsubscribe(ClientID, Topic) ->
|
|
case emqx_mgmt:unsubscribe(ClientID, Topic) of
|
|
{error, Reason} ->
|
|
{error, Reason};
|
|
Res ->
|
|
Res
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% QueryString to Match Spec
|
|
|
|
-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
|
|
qs2ms(_Tab, {QString, FuzzyQString}) ->
|
|
#{
|
|
match_spec => qs2ms(QString),
|
|
fuzzy_fun => fuzzy_filter_fun(FuzzyQString)
|
|
}.
|
|
|
|
-spec qs2ms(list()) -> ets:match_spec().
|
|
qs2ms(Qs) ->
|
|
{MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}),
|
|
[{{'$1', MtchHead, '_'}, Conds, ['$_']}].
|
|
|
|
qs2ms([], _, {MtchHead, Conds}) ->
|
|
{MtchHead, lists:reverse(Conds)};
|
|
qs2ms([{Key, '=:=', Value} | Rest], N, {MtchHead, Conds}) ->
|
|
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Value)),
|
|
qs2ms(Rest, N, {NMtchHead, Conds});
|
|
qs2ms([Qs | Rest], N, {MtchHead, Conds}) ->
|
|
Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8),
|
|
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)),
|
|
NConds = put_conds(Qs, Holder, Conds),
|
|
qs2ms(Rest, N + 1, {NMtchHead, NConds}).
|
|
|
|
put_conds({_, Op, V}, Holder, Conds) ->
|
|
[{Op, Holder, V} | Conds];
|
|
put_conds({_, Op1, V1, Op2, V2}, Holder, Conds) ->
|
|
[
|
|
{Op2, Holder, V2},
|
|
{Op1, Holder, V1}
|
|
| Conds
|
|
].
|
|
|
|
ms(clientid, X) ->
|
|
#{clientinfo => #{clientid => X}};
|
|
ms(username, X) ->
|
|
#{clientinfo => #{username => X}};
|
|
ms(conn_state, X) ->
|
|
#{conn_state => X};
|
|
ms(ip_address, X) ->
|
|
#{conninfo => #{peername => {X, '_'}}};
|
|
ms(clean_start, X) ->
|
|
#{conninfo => #{clean_start => X}};
|
|
ms(proto_ver, X) ->
|
|
#{conninfo => #{proto_ver => X}};
|
|
ms(connected_at, X) ->
|
|
#{conninfo => #{connected_at => X}};
|
|
ms(created_at, X) ->
|
|
#{session => #{created_at => X}}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Match funcs
|
|
|
|
fuzzy_filter_fun([]) ->
|
|
undefined;
|
|
fuzzy_filter_fun(Fuzzy) ->
|
|
{fun ?MODULE:run_fuzzy_filter/2, [Fuzzy]}.
|
|
|
|
run_fuzzy_filter(_, []) ->
|
|
true;
|
|
run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} | Fuzzy]) ->
|
|
Val =
|
|
case maps:get(Key, ClientInfo, <<>>) of
|
|
undefined -> <<>>;
|
|
V -> V
|
|
end,
|
|
binary:match(Val, SubStr) /= nomatch andalso run_fuzzy_filter(E, Fuzzy).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% format funcs
|
|
|
|
format_channel_info(ChannInfo = {_, _ClientInfo, _ClientStats}) ->
|
|
format_channel_info(node(), ChannInfo).
|
|
|
|
format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) ->
|
|
Node = maps:get(node, ClientInfo0, WhichNode),
|
|
ClientInfo1 = emqx_utils_maps:deep_remove([conninfo, clientid], ClientInfo0),
|
|
ClientInfo2 = emqx_utils_maps:deep_remove([conninfo, username], ClientInfo1),
|
|
StatsMap = maps:without(
|
|
[memory, next_pkt_id, total_heap_size],
|
|
maps:from_list(ClientStats)
|
|
),
|
|
ClientInfo3 = maps:remove(will_msg, ClientInfo2),
|
|
ClientInfoMap0 = maps:fold(fun take_maps_from_inner/3, #{}, ClientInfo3),
|
|
{IpAddress, Port} = peername_dispart(maps:get(peername, ClientInfoMap0)),
|
|
Connected = maps:get(conn_state, ClientInfoMap0) =:= connected,
|
|
ClientInfoMap1 = maps:merge(StatsMap, ClientInfoMap0),
|
|
ClientInfoMap2 = maps:put(node, Node, ClientInfoMap1),
|
|
ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2),
|
|
ClientInfoMap4 = maps:put(port, Port, ClientInfoMap3),
|
|
ClientInfoMap5 = convert_expiry_interval_unit(ClientInfoMap4),
|
|
ClientInfoMap = maps:put(connected, Connected, ClientInfoMap5),
|
|
|
|
RemoveList =
|
|
[
|
|
auth_result,
|
|
peername,
|
|
sockname,
|
|
peerhost,
|
|
conn_state,
|
|
send_pend,
|
|
conn_props,
|
|
peercert,
|
|
sockstate,
|
|
subscriptions,
|
|
receive_maximum,
|
|
protocol,
|
|
is_superuser,
|
|
sockport,
|
|
anonymous,
|
|
socktype,
|
|
active_n,
|
|
await_rel_timeout,
|
|
conn_mod,
|
|
sockname,
|
|
retry_interval,
|
|
upgrade_qos,
|
|
zone,
|
|
%% session_id, defined in emqx_session.erl
|
|
id,
|
|
acl
|
|
],
|
|
TimesKeys = [created_at, connected_at, disconnected_at],
|
|
%% format timestamp to rfc3339
|
|
result_format_undefined_to_null(
|
|
lists:foldl(
|
|
fun result_format_time_fun/2,
|
|
maps:without(RemoveList, ClientInfoMap),
|
|
TimesKeys
|
|
)
|
|
).
|
|
|
|
%% format func helpers
|
|
take_maps_from_inner(_Key, Value, Current) when is_map(Value) ->
|
|
maps:merge(Current, Value);
|
|
take_maps_from_inner(Key, Value, Current) ->
|
|
maps:put(Key, Value, Current).
|
|
|
|
result_format_time_fun(Key, NClientInfoMap) ->
|
|
case NClientInfoMap of
|
|
#{Key := TimeStamp} ->
|
|
NClientInfoMap#{
|
|
Key => emqx_utils_calendar:epoch_to_rfc3339(TimeStamp)
|
|
};
|
|
#{} ->
|
|
NClientInfoMap
|
|
end.
|
|
|
|
result_format_undefined_to_null(Map) ->
|
|
maps:map(
|
|
fun
|
|
(_, undefined) -> null;
|
|
(_, V) -> V
|
|
end,
|
|
Map
|
|
).
|
|
|
|
-spec peername_dispart(emqx_types:peername()) -> {binary(), inet:port_number()}.
|
|
peername_dispart({Addr, Port}) ->
|
|
AddrBinary = list_to_binary(inet:ntoa(Addr)),
|
|
%% PortBinary = integer_to_binary(Port),
|
|
{AddrBinary, Port}.
|
|
|
|
convert_expiry_interval_unit(ClientInfoMap = #{expiry_interval := Interval}) ->
|
|
ClientInfoMap#{expiry_interval := Interval div 1000}.
|
|
|
|
format_authz_cache({{PubSub, Topic}, {AuthzResult, Timestamp}}) ->
|
|
#{
|
|
access => PubSub,
|
|
topic => Topic,
|
|
result => AuthzResult,
|
|
updated_time => Timestamp
|
|
}.
|
|
|
|
to_topic_info(Data) ->
|
|
M = maps:with([<<"topic">>, <<"qos">>, <<"nl">>, <<"rap">>, <<"rh">>], Data),
|
|
emqx_utils_maps:safe_atom_key_map(M).
|
|
|
|
client_example() ->
|
|
#{
|
|
<<"recv_oct">> => 49,
|
|
<<"expiry_interval">> => 0,
|
|
<<"created_at">> => <<"2024-01-01T12:34:56.789+08:00">>,
|
|
<<"awaiting_rel_max">> => 100,
|
|
<<"send_msg">> => 0,
|
|
<<"enable_authn">> => true,
|
|
<<"send_msg.qos2">> => 0,
|
|
<<"peerport">> => 52571,
|
|
<<"connected_at">> => <<"2024-01-01T12:34:56.789+08:00">>,
|
|
<<"send_msg.dropped.too_large">> => 0,
|
|
<<"inflight_cnt">> => 0,
|
|
<<"keepalive">> => 60,
|
|
<<"node">> => <<"emqx@127.0.0.1">>,
|
|
<<"send_cnt">> => 4,
|
|
<<"recv_msg.dropped.await_pubrel_timeout">> => 0,
|
|
<<"recv_msg.dropped">> => 0,
|
|
<<"inflight_max">> => 32,
|
|
<<"proto_name">> => <<"MQTT">>,
|
|
<<"send_msg.dropped.expired">> => 0,
|
|
<<"awaiting_rel_cnt">> => 0,
|
|
<<"mqueue_max">> => 1000,
|
|
<<"send_oct">> => 31,
|
|
<<"send_msg.dropped.queue_full">> => 0,
|
|
<<"mqueue_len">> => 0,
|
|
<<"heap_size">> => 610,
|
|
<<"is_persistent">> => false,
|
|
<<"send_msg.qos0">> => 0,
|
|
<<"clean_start">> => true,
|
|
<<"mountpoint">> => <<"null">>,
|
|
<<"proto_ver">> => 5,
|
|
<<"ip_address">> => <<"127.0.0.1">>,
|
|
<<"mqueue_dropped">> => 0,
|
|
<<"port">> => 52571,
|
|
<<"listener">> => <<"tcp:default">>,
|
|
<<"recv_msg.qos2">> => 0,
|
|
<<"recv_msg.qos1">> => 0,
|
|
<<"is_bridge">> => false,
|
|
<<"subscriptions_cnt">> => 1,
|
|
<<"username">> => null,
|
|
<<"send_msg.dropped">> => 0,
|
|
<<"send_pkt">> => 4,
|
|
<<"subscriptions_max">> => <<"infinity">>,
|
|
<<"send_msg.qos1">> => 0,
|
|
<<"connected">> => true,
|
|
<<"reductions">> => 6836,
|
|
<<"mailbox_len">> => 0,
|
|
<<"clientid">> => "01",
|
|
<<"recv_msg">> => 0,
|
|
<<"recv_pkt">> => 4,
|
|
<<"recv_cnt">> => 4,
|
|
<<"recv_msg.qos0">> => 0
|
|
}.
|