From 1e7872c319c56fcbe637a06e8a6e8e7c73a8f33a Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 30 May 2023 19:58:05 +0800 Subject: [PATCH 1/5] feat: support kickout clients in batch --- apps/emqx_management/src/emqx_mgmt.erl | 27 +++++++++++- .../src/emqx_mgmt_api_clients.erl | 26 ++++++++++++ .../src/proto/emqx_management_proto_v3.erl | 8 +++- .../test/emqx_mgmt_api_clients_SUITE.erl | 41 +++++++++++++++++++ rel/i18n/emqx_mgmt_api_clients.hocon | 5 +++ 5 files changed, 105 insertions(+), 2 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 9553730ec..dec340604 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -47,6 +47,7 @@ lookup_client/2, lookup_client/3, kickout_client/1, + kickout_clients/1, list_authz_cache/1, list_client_subscriptions/1, client_subscriptions/2, @@ -58,7 +59,9 @@ clean_pem_cache_all/1, set_ratelimit_policy/2, set_quota_policy/2, - set_keepalive/2 + set_keepalive/2, + + do_kickout_clients/1 ]). %% Internal functions @@ -321,6 +324,28 @@ kickout_client(ClientId) -> kickout_client(Node, ClientId) -> unwrap_rpc(emqx_cm_proto_v1:kickout_client(Node, ClientId)). +kickout_clients(ClientIds) when is_list(ClientIds) -> + F = fun(Node) -> + emqx_management_proto_v3:kickout_clients(Node, ClientIds) + end, + Results = lists:map(F, emqx:running_nodes()), + case lists:filter(fun(Res) -> Res =/= ok end, Results) of + [] -> + ok; + [Result | _] -> + unwrap_rpc(Result) + end. + +do_kickout_clients(ClientIds) when is_list(ClientIds) -> + F = fun(ClientId) -> + ChanPids = emqx_cm:lookup_channels(local, ClientId), + lists:foreach( + fun(ChanPid) -> emqx_cm:kick_session(ClientId, ChanPid) end, + ChanPids + ) + end, + lists:foreach(F, ClientIds). + list_authz_cache(ClientId) -> call_client(ClientId, list_authz_cache). diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 7720ab66f..8c71acdcc 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -36,6 +36,7 @@ -export([ clients/2, + kickout_clients/2, client/2, subscriptions/2, authz_cache/2, @@ -88,6 +89,7 @@ api_spec() -> paths() -> [ "/clients", + "/clients/kickout/bulk", "/clients/:clientid", "/clients/:clientid/authorization/cache", "/clients/:clientid/subscriptions", @@ -211,6 +213,21 @@ schema("/clients") -> } } }; +schema("/clients/kickout/bulk") -> + #{ + 'operationId' => kickout_clients, + post => #{ + description => ?DESC(kickout_clients), + tags => ?TAGS, + 'requestBody' => hoconsc:mk( + hoconsc:array(binary()), + #{desc => <<"The list of Client IDs that need to be kicked out">>} + ), + responses => #{ + 204 => <<"Kick out clients successfully">> + } + } + }; schema("/clients/:clientid") -> #{ 'operationId' => client, @@ -568,6 +585,15 @@ fields(unsubscribe) -> clients(get, #{query_string := QString}) -> list_clients(QString). +kickout_clients(post, #{body := ClientIDs}) -> + case emqx_mgmt:kickout_clients(ClientIDs) of + ok -> + {204}; + {error, Reason} -> + Message = list_to_binary(io_lib:format("~p", [Reason])), + {500, #{code => <<"UNKNOW_ERROR">>, message => Message}} + end. + client(get, #{bindings := Bindings}) -> lookup(Bindings); client(delete, #{bindings := Bindings}) -> diff --git a/apps/emqx_management/src/proto/emqx_management_proto_v3.erl b/apps/emqx_management/src/proto/emqx_management_proto_v3.erl index 7ab06b99b..32278695c 100644 --- a/apps/emqx_management/src/proto/emqx_management_proto_v3.erl +++ b/apps/emqx_management/src/proto/emqx_management_proto_v3.erl @@ -32,7 +32,9 @@ call_client/3, - get_full_config/1 + get_full_config/1, + + kickout_clients/2 ]). -include_lib("emqx/include/bpapi.hrl"). @@ -78,3 +80,7 @@ call_client(Node, ClientId, Req) -> -spec get_full_config(node()) -> map() | list() | {badrpc, _}. get_full_config(Node) -> rpc:call(Node, emqx_mgmt_api_configs, get_full_config, []). + +-spec kickout_clients(node(), [emqx_types:clientid()]) -> ok | {badrpc, _}. +kickout_clients(Node, ClientIds) -> + rpc:call(Node, emqx_mgmt, do_kickout_clients, [ClientIds]). diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 89838c346..f5895eb62 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -167,6 +167,47 @@ t_clients(_) -> AfterKickoutResponse1 = emqx_mgmt_api_test_util:request_api(get, Client1Path), ?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse1). +t_kickout_clients(_) -> + process_flag(trap_exit, true), + + ClientId1 = <<"client1">>, + ClientId2 = <<"client2">>, + ClientId3 = <<"client3">>, + + {ok, C1} = emqtt:start_link(#{ + clientid => ClientId1, + proto_ver => v5, + properties => #{'Session-Expiry-Interval' => 120} + }), + {ok, _} = emqtt:connect(C1), + {ok, C2} = emqtt:start_link(#{clientid => ClientId2}), + {ok, _} = emqtt:connect(C2), + {ok, C3} = emqtt:start_link(#{clientid => ClientId3}), + {ok, _} = emqtt:connect(C3), + + timer:sleep(300), + + %% get /clients + ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]), + {ok, Clients} = emqx_mgmt_api_test_util:request_api(get, ClientsPath), + ClientsResponse = emqx_utils_json:decode(Clients, [return_maps]), + ClientsMeta = maps:get(<<"meta">>, ClientsResponse), + ClientsPage = maps:get(<<"page">>, ClientsMeta), + ClientsLimit = maps:get(<<"limit">>, ClientsMeta), + ClientsCount = maps:get(<<"count">>, ClientsMeta), + ?assertEqual(ClientsPage, 1), + ?assertEqual(ClientsLimit, emqx_mgmt:default_row_limit()), + ?assertEqual(ClientsCount, 3), + + %% kickout clients + KickoutPath = emqx_mgmt_api_test_util:api_path(["clients", "kickout", "bulk"]), + KickoutBody = [ClientId1, ClientId2, ClientId3], + {ok, _} = emqx_mgmt_api_test_util:request_api_with_body(post, KickoutPath, KickoutBody), + + {ok, Clients2} = emqx_mgmt_api_test_util:request_api(get, ClientsPath), + ClientsResponse2 = emqx_utils_json:decode(Clients2, [return_maps]), + ?assertMatch(#{<<"meta">> := #{<<"count">> := 0}}, ClientsResponse2). + t_query_clients_with_time(_) -> process_flag(trap_exit, true), diff --git a/rel/i18n/emqx_mgmt_api_clients.hocon b/rel/i18n/emqx_mgmt_api_clients.hocon index d82642b1e..64d4e5279 100644 --- a/rel/i18n/emqx_mgmt_api_clients.hocon +++ b/rel/i18n/emqx_mgmt_api_clients.hocon @@ -5,6 +5,11 @@ list_clients.desc: list_clients.label: """List clients""" +kickout_clients.desc: +"""Kick out a batch of client by client IDs""" +kickout_clients.label: +"""Kick out a batch of client by client IDs""" + clients_info_from_id.desc: """Get clients info by client ID""" clients_info_from_id.label: From e5534610e0280753fbcdb71bc26359630c2c9f79 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 30 May 2023 20:03:45 +0800 Subject: [PATCH 2/5] chore: update changes --- changes/ce/fix-10880.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ce/fix-10880.en.md diff --git a/changes/ce/fix-10880.en.md b/changes/ce/fix-10880.en.md new file mode 100644 index 000000000..4bf3b7211 --- /dev/null +++ b/changes/ce/fix-10880.en.md @@ -0,0 +1 @@ +Add a new HTTP API endpoint `/clients/kickout/bulk` for kicking out multiple clients in bulk. From fc806c4acd92395af2b42eb901dbe7d20acff256 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 30 May 2023 20:15:49 +0800 Subject: [PATCH 3/5] test: fix failed tests --- apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index f5895eb62..6e3768431 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -202,7 +202,7 @@ t_kickout_clients(_) -> %% kickout clients KickoutPath = emqx_mgmt_api_test_util:api_path(["clients", "kickout", "bulk"]), KickoutBody = [ClientId1, ClientId2, ClientId3], - {ok, _} = emqx_mgmt_api_test_util:request_api_with_body(post, KickoutPath, KickoutBody), + {ok, 204, _} = emqx_mgmt_api_test_util:request_api_with_body(post, KickoutPath, KickoutBody), {ok, Clients2} = emqx_mgmt_api_test_util:request_api(get, ClientsPath), ClientsResponse2 = emqx_utils_json:decode(Clients2, [return_maps]), From 48c53d8b327601c6a06d0fd6ac13644d24fbdd20 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 30 May 2023 21:46:33 +0800 Subject: [PATCH 4/5] chore: add emqx_management_proto_v4 --- .../src/emqx_management.app.src | 2 +- apps/emqx_management/src/emqx_mgmt.erl | 2 +- .../src/proto/emqx_management_proto_v3.erl | 8 +- .../src/proto/emqx_management_proto_v4.erl | 86 +++++++++++++++++++ 4 files changed, 89 insertions(+), 9 deletions(-) create mode 100644 apps/emqx_management/src/proto/emqx_management_proto_v4.erl diff --git a/apps/emqx_management/src/emqx_management.app.src b/apps/emqx_management/src/emqx_management.app.src index 31c719a33..4ee7dea10 100644 --- a/apps/emqx_management/src/emqx_management.app.src +++ b/apps/emqx_management/src/emqx_management.app.src @@ -2,7 +2,7 @@ {application, emqx_management, [ {description, "EMQX Management API and CLI"}, % strict semver, bump manually! - {vsn, "5.0.23"}, + {vsn, "5.0.24"}, {modules, []}, {registered, [emqx_management_sup]}, {applications, [kernel, stdlib, emqx_plugins, minirest, emqx, emqx_ctl]}, diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index dec340604..24a4c3fe4 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -326,7 +326,7 @@ kickout_client(Node, ClientId) -> kickout_clients(ClientIds) when is_list(ClientIds) -> F = fun(Node) -> - emqx_management_proto_v3:kickout_clients(Node, ClientIds) + emqx_management_proto_v4:kickout_clients(Node, ClientIds) end, Results = lists:map(F, emqx:running_nodes()), case lists:filter(fun(Res) -> Res =/= ok end, Results) of diff --git a/apps/emqx_management/src/proto/emqx_management_proto_v3.erl b/apps/emqx_management/src/proto/emqx_management_proto_v3.erl index 32278695c..7ab06b99b 100644 --- a/apps/emqx_management/src/proto/emqx_management_proto_v3.erl +++ b/apps/emqx_management/src/proto/emqx_management_proto_v3.erl @@ -32,9 +32,7 @@ call_client/3, - get_full_config/1, - - kickout_clients/2 + get_full_config/1 ]). -include_lib("emqx/include/bpapi.hrl"). @@ -80,7 +78,3 @@ call_client(Node, ClientId, Req) -> -spec get_full_config(node()) -> map() | list() | {badrpc, _}. get_full_config(Node) -> rpc:call(Node, emqx_mgmt_api_configs, get_full_config, []). - --spec kickout_clients(node(), [emqx_types:clientid()]) -> ok | {badrpc, _}. -kickout_clients(Node, ClientIds) -> - rpc:call(Node, emqx_mgmt, do_kickout_clients, [ClientIds]). diff --git a/apps/emqx_management/src/proto/emqx_management_proto_v4.erl b/apps/emqx_management/src/proto/emqx_management_proto_v4.erl new file mode 100644 index 000000000..0263202bb --- /dev/null +++ b/apps/emqx_management/src/proto/emqx_management_proto_v4.erl @@ -0,0 +1,86 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_management_proto_v4). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + node_info/1, + broker_info/1, + list_subscriptions/1, + + list_listeners/1, + subscribe/3, + unsubscribe/3, + unsubscribe_batch/3, + + call_client/3, + + get_full_config/1, + + kickout_clients/2 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.1.0". + +-spec unsubscribe_batch(node(), emqx_types:clientid(), [emqx_types:topic()]) -> + {unsubscribe, _} | {error, _} | {badrpc, _}. +unsubscribe_batch(Node, ClientId, Topics) -> + rpc:call(Node, emqx_mgmt, do_unsubscribe_batch, [ClientId, Topics]). + +-spec node_info([node()]) -> emqx_rpc:erpc_multicall(map()). +node_info(Nodes) -> + erpc:multicall(Nodes, emqx_mgmt, node_info, [], 30000). + +-spec broker_info([node()]) -> emqx_rpc:erpc_multicall(map()). +broker_info(Nodes) -> + erpc:multicall(Nodes, emqx_mgmt, broker_info, [], 30000). + +-spec list_subscriptions(node()) -> [map()] | {badrpc, _}. +list_subscriptions(Node) -> + rpc:call(Node, emqx_mgmt, do_list_subscriptions, []). + +-spec list_listeners(node()) -> map() | {badrpc, _}. +list_listeners(Node) -> + rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []). + +-spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) -> + {subscribe, _} | {error, atom()} | {badrpc, _}. +subscribe(Node, ClientId, TopicTables) -> + rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]). + +-spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) -> + {unsubscribe, _} | {error, _} | {badrpc, _}. +unsubscribe(Node, ClientId, Topic) -> + rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]). + +-spec call_client(node(), emqx_types:clientid(), term()) -> term(). +call_client(Node, ClientId, Req) -> + rpc:call(Node, emqx_mgmt, do_call_client, [ClientId, Req]). + +-spec get_full_config(node()) -> map() | list() | {badrpc, _}. +get_full_config(Node) -> + rpc:call(Node, emqx_mgmt_api_configs, get_full_config, []). + +-spec kickout_clients(node(), [emqx_types:clientid()]) -> ok | {badrpc, _}. +kickout_clients(Node, ClientIds) -> + rpc:call(Node, emqx_mgmt, do_kickout_clients, [ClientIds]). From afcd9fcb5eb22d272f8af6a6ab370cbcc45d9485 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 31 May 2023 11:29:55 +0800 Subject: [PATCH 5/5] chore: update bpapi.versions --- apps/emqx/priv/bpapi.versions | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 55fe1bf42..a86705eae 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -27,6 +27,7 @@ {emqx_management,1}. {emqx_management,2}. {emqx_management,3}. +{emqx_management,4}. {emqx_mgmt_api_plugins,1}. {emqx_mgmt_cluster,1}. {emqx_mgmt_trace,1}.