diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 55fe1bf42..a86705eae 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -27,6 +27,7 @@ {emqx_management,1}. {emqx_management,2}. {emqx_management,3}. +{emqx_management,4}. {emqx_mgmt_api_plugins,1}. {emqx_mgmt_cluster,1}. {emqx_mgmt_trace,1}. diff --git a/apps/emqx_management/src/emqx_management.app.src b/apps/emqx_management/src/emqx_management.app.src index 31c719a33..4ee7dea10 100644 --- a/apps/emqx_management/src/emqx_management.app.src +++ b/apps/emqx_management/src/emqx_management.app.src @@ -2,7 +2,7 @@ {application, emqx_management, [ {description, "EMQX Management API and CLI"}, % strict semver, bump manually! - {vsn, "5.0.23"}, + {vsn, "5.0.24"}, {modules, []}, {registered, [emqx_management_sup]}, {applications, [kernel, stdlib, emqx_plugins, minirest, emqx, emqx_ctl]}, diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 9553730ec..24a4c3fe4 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -47,6 +47,7 @@ lookup_client/2, lookup_client/3, kickout_client/1, + kickout_clients/1, list_authz_cache/1, list_client_subscriptions/1, client_subscriptions/2, @@ -58,7 +59,9 @@ clean_pem_cache_all/1, set_ratelimit_policy/2, set_quota_policy/2, - set_keepalive/2 + set_keepalive/2, + + do_kickout_clients/1 ]). %% Internal functions @@ -321,6 +324,28 @@ kickout_client(ClientId) -> kickout_client(Node, ClientId) -> unwrap_rpc(emqx_cm_proto_v1:kickout_client(Node, ClientId)). +kickout_clients(ClientIds) when is_list(ClientIds) -> + F = fun(Node) -> + emqx_management_proto_v4:kickout_clients(Node, ClientIds) + end, + Results = lists:map(F, emqx:running_nodes()), + case lists:filter(fun(Res) -> Res =/= ok end, Results) of + [] -> + ok; + [Result | _] -> + unwrap_rpc(Result) + end. + +do_kickout_clients(ClientIds) when is_list(ClientIds) -> + F = fun(ClientId) -> + ChanPids = emqx_cm:lookup_channels(local, ClientId), + lists:foreach( + fun(ChanPid) -> emqx_cm:kick_session(ClientId, ChanPid) end, + ChanPids + ) + end, + lists:foreach(F, ClientIds). + list_authz_cache(ClientId) -> call_client(ClientId, list_authz_cache). diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 7720ab66f..8c71acdcc 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -36,6 +36,7 @@ -export([ clients/2, + kickout_clients/2, client/2, subscriptions/2, authz_cache/2, @@ -88,6 +89,7 @@ api_spec() -> paths() -> [ "/clients", + "/clients/kickout/bulk", "/clients/:clientid", "/clients/:clientid/authorization/cache", "/clients/:clientid/subscriptions", @@ -211,6 +213,21 @@ schema("/clients") -> } } }; +schema("/clients/kickout/bulk") -> + #{ + 'operationId' => kickout_clients, + post => #{ + description => ?DESC(kickout_clients), + tags => ?TAGS, + 'requestBody' => hoconsc:mk( + hoconsc:array(binary()), + #{desc => <<"The list of Client IDs that need to be kicked out">>} + ), + responses => #{ + 204 => <<"Kick out clients successfully">> + } + } + }; schema("/clients/:clientid") -> #{ 'operationId' => client, @@ -568,6 +585,15 @@ fields(unsubscribe) -> clients(get, #{query_string := QString}) -> list_clients(QString). +kickout_clients(post, #{body := ClientIDs}) -> + case emqx_mgmt:kickout_clients(ClientIDs) of + ok -> + {204}; + {error, Reason} -> + Message = list_to_binary(io_lib:format("~p", [Reason])), + {500, #{code => <<"UNKNOW_ERROR">>, message => Message}} + end. + client(get, #{bindings := Bindings}) -> lookup(Bindings); client(delete, #{bindings := Bindings}) -> diff --git a/apps/emqx_management/src/proto/emqx_management_proto_v4.erl b/apps/emqx_management/src/proto/emqx_management_proto_v4.erl new file mode 100644 index 000000000..0263202bb --- /dev/null +++ b/apps/emqx_management/src/proto/emqx_management_proto_v4.erl @@ -0,0 +1,86 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_management_proto_v4). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + node_info/1, + broker_info/1, + list_subscriptions/1, + + list_listeners/1, + subscribe/3, + unsubscribe/3, + unsubscribe_batch/3, + + call_client/3, + + get_full_config/1, + + kickout_clients/2 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.1.0". + +-spec unsubscribe_batch(node(), emqx_types:clientid(), [emqx_types:topic()]) -> + {unsubscribe, _} | {error, _} | {badrpc, _}. +unsubscribe_batch(Node, ClientId, Topics) -> + rpc:call(Node, emqx_mgmt, do_unsubscribe_batch, [ClientId, Topics]). + +-spec node_info([node()]) -> emqx_rpc:erpc_multicall(map()). +node_info(Nodes) -> + erpc:multicall(Nodes, emqx_mgmt, node_info, [], 30000). + +-spec broker_info([node()]) -> emqx_rpc:erpc_multicall(map()). +broker_info(Nodes) -> + erpc:multicall(Nodes, emqx_mgmt, broker_info, [], 30000). + +-spec list_subscriptions(node()) -> [map()] | {badrpc, _}. +list_subscriptions(Node) -> + rpc:call(Node, emqx_mgmt, do_list_subscriptions, []). + +-spec list_listeners(node()) -> map() | {badrpc, _}. +list_listeners(Node) -> + rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []). + +-spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) -> + {subscribe, _} | {error, atom()} | {badrpc, _}. +subscribe(Node, ClientId, TopicTables) -> + rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]). + +-spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) -> + {unsubscribe, _} | {error, _} | {badrpc, _}. +unsubscribe(Node, ClientId, Topic) -> + rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]). + +-spec call_client(node(), emqx_types:clientid(), term()) -> term(). +call_client(Node, ClientId, Req) -> + rpc:call(Node, emqx_mgmt, do_call_client, [ClientId, Req]). + +-spec get_full_config(node()) -> map() | list() | {badrpc, _}. +get_full_config(Node) -> + rpc:call(Node, emqx_mgmt_api_configs, get_full_config, []). + +-spec kickout_clients(node(), [emqx_types:clientid()]) -> ok | {badrpc, _}. +kickout_clients(Node, ClientIds) -> + rpc:call(Node, emqx_mgmt, do_kickout_clients, [ClientIds]). diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 89838c346..6e3768431 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -167,6 +167,47 @@ t_clients(_) -> AfterKickoutResponse1 = emqx_mgmt_api_test_util:request_api(get, Client1Path), ?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse1). +t_kickout_clients(_) -> + process_flag(trap_exit, true), + + ClientId1 = <<"client1">>, + ClientId2 = <<"client2">>, + ClientId3 = <<"client3">>, + + {ok, C1} = emqtt:start_link(#{ + clientid => ClientId1, + proto_ver => v5, + properties => #{'Session-Expiry-Interval' => 120} + }), + {ok, _} = emqtt:connect(C1), + {ok, C2} = emqtt:start_link(#{clientid => ClientId2}), + {ok, _} = emqtt:connect(C2), + {ok, C3} = emqtt:start_link(#{clientid => ClientId3}), + {ok, _} = emqtt:connect(C3), + + timer:sleep(300), + + %% get /clients + ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]), + {ok, Clients} = emqx_mgmt_api_test_util:request_api(get, ClientsPath), + ClientsResponse = emqx_utils_json:decode(Clients, [return_maps]), + ClientsMeta = maps:get(<<"meta">>, ClientsResponse), + ClientsPage = maps:get(<<"page">>, ClientsMeta), + ClientsLimit = maps:get(<<"limit">>, ClientsMeta), + ClientsCount = maps:get(<<"count">>, ClientsMeta), + ?assertEqual(ClientsPage, 1), + ?assertEqual(ClientsLimit, emqx_mgmt:default_row_limit()), + ?assertEqual(ClientsCount, 3), + + %% kickout clients + KickoutPath = emqx_mgmt_api_test_util:api_path(["clients", "kickout", "bulk"]), + KickoutBody = [ClientId1, ClientId2, ClientId3], + {ok, 204, _} = emqx_mgmt_api_test_util:request_api_with_body(post, KickoutPath, KickoutBody), + + {ok, Clients2} = emqx_mgmt_api_test_util:request_api(get, ClientsPath), + ClientsResponse2 = emqx_utils_json:decode(Clients2, [return_maps]), + ?assertMatch(#{<<"meta">> := #{<<"count">> := 0}}, ClientsResponse2). + t_query_clients_with_time(_) -> process_flag(trap_exit, true), diff --git a/changes/ce/fix-10880.en.md b/changes/ce/fix-10880.en.md new file mode 100644 index 000000000..4bf3b7211 --- /dev/null +++ b/changes/ce/fix-10880.en.md @@ -0,0 +1 @@ +Add a new HTTP API endpoint `/clients/kickout/bulk` for kicking out multiple clients in bulk. diff --git a/rel/i18n/emqx_mgmt_api_clients.hocon b/rel/i18n/emqx_mgmt_api_clients.hocon index d82642b1e..64d4e5279 100644 --- a/rel/i18n/emqx_mgmt_api_clients.hocon +++ b/rel/i18n/emqx_mgmt_api_clients.hocon @@ -5,6 +5,11 @@ list_clients.desc: list_clients.label: """List clients""" +kickout_clients.desc: +"""Kick out a batch of client by client IDs""" +kickout_clients.label: +"""Kick out a batch of client by client IDs""" + clients_info_from_id.desc: """Get clients info by client ID""" clients_info_from_id.label: