Merge pull request #10880 from HJianBo/batch-kickout-clients
feat: support kickout clients in batch
This commit is contained in:
commit
a3a6480f00
|
@ -27,6 +27,7 @@
|
|||
{emqx_management,1}.
|
||||
{emqx_management,2}.
|
||||
{emqx_management,3}.
|
||||
{emqx_management,4}.
|
||||
{emqx_mgmt_api_plugins,1}.
|
||||
{emqx_mgmt_cluster,1}.
|
||||
{emqx_mgmt_trace,1}.
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
{application, emqx_management, [
|
||||
{description, "EMQX Management API and CLI"},
|
||||
% strict semver, bump manually!
|
||||
{vsn, "5.0.23"},
|
||||
{vsn, "5.0.24"},
|
||||
{modules, []},
|
||||
{registered, [emqx_management_sup]},
|
||||
{applications, [kernel, stdlib, emqx_plugins, minirest, emqx, emqx_ctl]},
|
||||
|
|
|
@ -47,6 +47,7 @@
|
|||
lookup_client/2,
|
||||
lookup_client/3,
|
||||
kickout_client/1,
|
||||
kickout_clients/1,
|
||||
list_authz_cache/1,
|
||||
list_client_subscriptions/1,
|
||||
client_subscriptions/2,
|
||||
|
@ -58,7 +59,9 @@
|
|||
clean_pem_cache_all/1,
|
||||
set_ratelimit_policy/2,
|
||||
set_quota_policy/2,
|
||||
set_keepalive/2
|
||||
set_keepalive/2,
|
||||
|
||||
do_kickout_clients/1
|
||||
]).
|
||||
|
||||
%% Internal functions
|
||||
|
@ -321,6 +324,28 @@ kickout_client(ClientId) ->
|
|||
kickout_client(Node, ClientId) ->
|
||||
unwrap_rpc(emqx_cm_proto_v1:kickout_client(Node, ClientId)).
|
||||
|
||||
kickout_clients(ClientIds) when is_list(ClientIds) ->
|
||||
F = fun(Node) ->
|
||||
emqx_management_proto_v4:kickout_clients(Node, ClientIds)
|
||||
end,
|
||||
Results = lists:map(F, emqx:running_nodes()),
|
||||
case lists:filter(fun(Res) -> Res =/= ok end, Results) of
|
||||
[] ->
|
||||
ok;
|
||||
[Result | _] ->
|
||||
unwrap_rpc(Result)
|
||||
end.
|
||||
|
||||
do_kickout_clients(ClientIds) when is_list(ClientIds) ->
|
||||
F = fun(ClientId) ->
|
||||
ChanPids = emqx_cm:lookup_channels(local, ClientId),
|
||||
lists:foreach(
|
||||
fun(ChanPid) -> emqx_cm:kick_session(ClientId, ChanPid) end,
|
||||
ChanPids
|
||||
)
|
||||
end,
|
||||
lists:foreach(F, ClientIds).
|
||||
|
||||
list_authz_cache(ClientId) ->
|
||||
call_client(ClientId, list_authz_cache).
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
|
||||
-export([
|
||||
clients/2,
|
||||
kickout_clients/2,
|
||||
client/2,
|
||||
subscriptions/2,
|
||||
authz_cache/2,
|
||||
|
@ -88,6 +89,7 @@ api_spec() ->
|
|||
paths() ->
|
||||
[
|
||||
"/clients",
|
||||
"/clients/kickout/bulk",
|
||||
"/clients/:clientid",
|
||||
"/clients/:clientid/authorization/cache",
|
||||
"/clients/:clientid/subscriptions",
|
||||
|
@ -211,6 +213,21 @@ schema("/clients") ->
|
|||
}
|
||||
}
|
||||
};
|
||||
schema("/clients/kickout/bulk") ->
|
||||
#{
|
||||
'operationId' => kickout_clients,
|
||||
post => #{
|
||||
description => ?DESC(kickout_clients),
|
||||
tags => ?TAGS,
|
||||
'requestBody' => hoconsc:mk(
|
||||
hoconsc:array(binary()),
|
||||
#{desc => <<"The list of Client IDs that need to be kicked out">>}
|
||||
),
|
||||
responses => #{
|
||||
204 => <<"Kick out clients successfully">>
|
||||
}
|
||||
}
|
||||
};
|
||||
schema("/clients/:clientid") ->
|
||||
#{
|
||||
'operationId' => client,
|
||||
|
@ -568,6 +585,15 @@ fields(unsubscribe) ->
|
|||
clients(get, #{query_string := QString}) ->
|
||||
list_clients(QString).
|
||||
|
||||
kickout_clients(post, #{body := ClientIDs}) ->
|
||||
case emqx_mgmt:kickout_clients(ClientIDs) of
|
||||
ok ->
|
||||
{204};
|
||||
{error, Reason} ->
|
||||
Message = list_to_binary(io_lib:format("~p", [Reason])),
|
||||
{500, #{code => <<"UNKNOW_ERROR">>, message => Message}}
|
||||
end.
|
||||
|
||||
client(get, #{bindings := Bindings}) ->
|
||||
lookup(Bindings);
|
||||
client(delete, #{bindings := Bindings}) ->
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_management_proto_v4).
|
||||
|
||||
-behaviour(emqx_bpapi).
|
||||
|
||||
-export([
|
||||
introduced_in/0,
|
||||
|
||||
node_info/1,
|
||||
broker_info/1,
|
||||
list_subscriptions/1,
|
||||
|
||||
list_listeners/1,
|
||||
subscribe/3,
|
||||
unsubscribe/3,
|
||||
unsubscribe_batch/3,
|
||||
|
||||
call_client/3,
|
||||
|
||||
get_full_config/1,
|
||||
|
||||
kickout_clients/2
|
||||
]).
|
||||
|
||||
-include_lib("emqx/include/bpapi.hrl").
|
||||
|
||||
introduced_in() ->
|
||||
"5.1.0".
|
||||
|
||||
-spec unsubscribe_batch(node(), emqx_types:clientid(), [emqx_types:topic()]) ->
|
||||
{unsubscribe, _} | {error, _} | {badrpc, _}.
|
||||
unsubscribe_batch(Node, ClientId, Topics) ->
|
||||
rpc:call(Node, emqx_mgmt, do_unsubscribe_batch, [ClientId, Topics]).
|
||||
|
||||
-spec node_info([node()]) -> emqx_rpc:erpc_multicall(map()).
|
||||
node_info(Nodes) ->
|
||||
erpc:multicall(Nodes, emqx_mgmt, node_info, [], 30000).
|
||||
|
||||
-spec broker_info([node()]) -> emqx_rpc:erpc_multicall(map()).
|
||||
broker_info(Nodes) ->
|
||||
erpc:multicall(Nodes, emqx_mgmt, broker_info, [], 30000).
|
||||
|
||||
-spec list_subscriptions(node()) -> [map()] | {badrpc, _}.
|
||||
list_subscriptions(Node) ->
|
||||
rpc:call(Node, emqx_mgmt, do_list_subscriptions, []).
|
||||
|
||||
-spec list_listeners(node()) -> map() | {badrpc, _}.
|
||||
list_listeners(Node) ->
|
||||
rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []).
|
||||
|
||||
-spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) ->
|
||||
{subscribe, _} | {error, atom()} | {badrpc, _}.
|
||||
subscribe(Node, ClientId, TopicTables) ->
|
||||
rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]).
|
||||
|
||||
-spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) ->
|
||||
{unsubscribe, _} | {error, _} | {badrpc, _}.
|
||||
unsubscribe(Node, ClientId, Topic) ->
|
||||
rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]).
|
||||
|
||||
-spec call_client(node(), emqx_types:clientid(), term()) -> term().
|
||||
call_client(Node, ClientId, Req) ->
|
||||
rpc:call(Node, emqx_mgmt, do_call_client, [ClientId, Req]).
|
||||
|
||||
-spec get_full_config(node()) -> map() | list() | {badrpc, _}.
|
||||
get_full_config(Node) ->
|
||||
rpc:call(Node, emqx_mgmt_api_configs, get_full_config, []).
|
||||
|
||||
-spec kickout_clients(node(), [emqx_types:clientid()]) -> ok | {badrpc, _}.
|
||||
kickout_clients(Node, ClientIds) ->
|
||||
rpc:call(Node, emqx_mgmt, do_kickout_clients, [ClientIds]).
|
|
@ -167,6 +167,47 @@ t_clients(_) ->
|
|||
AfterKickoutResponse1 = emqx_mgmt_api_test_util:request_api(get, Client1Path),
|
||||
?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse1).
|
||||
|
||||
t_kickout_clients(_) ->
|
||||
process_flag(trap_exit, true),
|
||||
|
||||
ClientId1 = <<"client1">>,
|
||||
ClientId2 = <<"client2">>,
|
||||
ClientId3 = <<"client3">>,
|
||||
|
||||
{ok, C1} = emqtt:start_link(#{
|
||||
clientid => ClientId1,
|
||||
proto_ver => v5,
|
||||
properties => #{'Session-Expiry-Interval' => 120}
|
||||
}),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
{ok, C2} = emqtt:start_link(#{clientid => ClientId2}),
|
||||
{ok, _} = emqtt:connect(C2),
|
||||
{ok, C3} = emqtt:start_link(#{clientid => ClientId3}),
|
||||
{ok, _} = emqtt:connect(C3),
|
||||
|
||||
timer:sleep(300),
|
||||
|
||||
%% get /clients
|
||||
ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]),
|
||||
{ok, Clients} = emqx_mgmt_api_test_util:request_api(get, ClientsPath),
|
||||
ClientsResponse = emqx_utils_json:decode(Clients, [return_maps]),
|
||||
ClientsMeta = maps:get(<<"meta">>, ClientsResponse),
|
||||
ClientsPage = maps:get(<<"page">>, ClientsMeta),
|
||||
ClientsLimit = maps:get(<<"limit">>, ClientsMeta),
|
||||
ClientsCount = maps:get(<<"count">>, ClientsMeta),
|
||||
?assertEqual(ClientsPage, 1),
|
||||
?assertEqual(ClientsLimit, emqx_mgmt:default_row_limit()),
|
||||
?assertEqual(ClientsCount, 3),
|
||||
|
||||
%% kickout clients
|
||||
KickoutPath = emqx_mgmt_api_test_util:api_path(["clients", "kickout", "bulk"]),
|
||||
KickoutBody = [ClientId1, ClientId2, ClientId3],
|
||||
{ok, 204, _} = emqx_mgmt_api_test_util:request_api_with_body(post, KickoutPath, KickoutBody),
|
||||
|
||||
{ok, Clients2} = emqx_mgmt_api_test_util:request_api(get, ClientsPath),
|
||||
ClientsResponse2 = emqx_utils_json:decode(Clients2, [return_maps]),
|
||||
?assertMatch(#{<<"meta">> := #{<<"count">> := 0}}, ClientsResponse2).
|
||||
|
||||
t_query_clients_with_time(_) ->
|
||||
process_flag(trap_exit, true),
|
||||
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Add a new HTTP API endpoint `/clients/kickout/bulk` for kicking out multiple clients in bulk.
|
|
@ -5,6 +5,11 @@ list_clients.desc:
|
|||
list_clients.label:
|
||||
"""List clients"""
|
||||
|
||||
kickout_clients.desc:
|
||||
"""Kick out a batch of client by client IDs"""
|
||||
kickout_clients.label:
|
||||
"""Kick out a batch of client by client IDs"""
|
||||
|
||||
clients_info_from_id.desc:
|
||||
"""Get clients info by client ID"""
|
||||
clients_info_from_id.label:
|
||||
|
|
Loading…
Reference in New Issue