diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 9553730ec..dec340604 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -47,6 +47,7 @@ lookup_client/2, lookup_client/3, kickout_client/1, + kickout_clients/1, list_authz_cache/1, list_client_subscriptions/1, client_subscriptions/2, @@ -58,7 +59,9 @@ clean_pem_cache_all/1, set_ratelimit_policy/2, set_quota_policy/2, - set_keepalive/2 + set_keepalive/2, + + do_kickout_clients/1 ]). %% Internal functions @@ -321,6 +324,28 @@ kickout_client(ClientId) -> kickout_client(Node, ClientId) -> unwrap_rpc(emqx_cm_proto_v1:kickout_client(Node, ClientId)). +kickout_clients(ClientIds) when is_list(ClientIds) -> + F = fun(Node) -> + emqx_management_proto_v3:kickout_clients(Node, ClientIds) + end, + Results = lists:map(F, emqx:running_nodes()), + case lists:filter(fun(Res) -> Res =/= ok end, Results) of + [] -> + ok; + [Result | _] -> + unwrap_rpc(Result) + end. + +do_kickout_clients(ClientIds) when is_list(ClientIds) -> + F = fun(ClientId) -> + ChanPids = emqx_cm:lookup_channels(local, ClientId), + lists:foreach( + fun(ChanPid) -> emqx_cm:kick_session(ClientId, ChanPid) end, + ChanPids + ) + end, + lists:foreach(F, ClientIds). + list_authz_cache(ClientId) -> call_client(ClientId, list_authz_cache). diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 7720ab66f..8c71acdcc 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -36,6 +36,7 @@ -export([ clients/2, + kickout_clients/2, client/2, subscriptions/2, authz_cache/2, @@ -88,6 +89,7 @@ api_spec() -> paths() -> [ "/clients", + "/clients/kickout/bulk", "/clients/:clientid", "/clients/:clientid/authorization/cache", "/clients/:clientid/subscriptions", @@ -211,6 +213,21 @@ schema("/clients") -> } } }; +schema("/clients/kickout/bulk") -> + #{ + 'operationId' => kickout_clients, + post => #{ + description => ?DESC(kickout_clients), + tags => ?TAGS, + 'requestBody' => hoconsc:mk( + hoconsc:array(binary()), + #{desc => <<"The list of Client IDs that need to be kicked out">>} + ), + responses => #{ + 204 => <<"Kick out clients successfully">> + } + } + }; schema("/clients/:clientid") -> #{ 'operationId' => client, @@ -568,6 +585,15 @@ fields(unsubscribe) -> clients(get, #{query_string := QString}) -> list_clients(QString). +kickout_clients(post, #{body := ClientIDs}) -> + case emqx_mgmt:kickout_clients(ClientIDs) of + ok -> + {204}; + {error, Reason} -> + Message = list_to_binary(io_lib:format("~p", [Reason])), + {500, #{code => <<"UNKNOW_ERROR">>, message => Message}} + end. + client(get, #{bindings := Bindings}) -> lookup(Bindings); client(delete, #{bindings := Bindings}) -> diff --git a/apps/emqx_management/src/proto/emqx_management_proto_v3.erl b/apps/emqx_management/src/proto/emqx_management_proto_v3.erl index 7ab06b99b..32278695c 100644 --- a/apps/emqx_management/src/proto/emqx_management_proto_v3.erl +++ b/apps/emqx_management/src/proto/emqx_management_proto_v3.erl @@ -32,7 +32,9 @@ call_client/3, - get_full_config/1 + get_full_config/1, + + kickout_clients/2 ]). -include_lib("emqx/include/bpapi.hrl"). @@ -78,3 +80,7 @@ call_client(Node, ClientId, Req) -> -spec get_full_config(node()) -> map() | list() | {badrpc, _}. get_full_config(Node) -> rpc:call(Node, emqx_mgmt_api_configs, get_full_config, []). + +-spec kickout_clients(node(), [emqx_types:clientid()]) -> ok | {badrpc, _}. +kickout_clients(Node, ClientIds) -> + rpc:call(Node, emqx_mgmt, do_kickout_clients, [ClientIds]). diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 89838c346..f5895eb62 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -167,6 +167,47 @@ t_clients(_) -> AfterKickoutResponse1 = emqx_mgmt_api_test_util:request_api(get, Client1Path), ?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse1). +t_kickout_clients(_) -> + process_flag(trap_exit, true), + + ClientId1 = <<"client1">>, + ClientId2 = <<"client2">>, + ClientId3 = <<"client3">>, + + {ok, C1} = emqtt:start_link(#{ + clientid => ClientId1, + proto_ver => v5, + properties => #{'Session-Expiry-Interval' => 120} + }), + {ok, _} = emqtt:connect(C1), + {ok, C2} = emqtt:start_link(#{clientid => ClientId2}), + {ok, _} = emqtt:connect(C2), + {ok, C3} = emqtt:start_link(#{clientid => ClientId3}), + {ok, _} = emqtt:connect(C3), + + timer:sleep(300), + + %% get /clients + ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]), + {ok, Clients} = emqx_mgmt_api_test_util:request_api(get, ClientsPath), + ClientsResponse = emqx_utils_json:decode(Clients, [return_maps]), + ClientsMeta = maps:get(<<"meta">>, ClientsResponse), + ClientsPage = maps:get(<<"page">>, ClientsMeta), + ClientsLimit = maps:get(<<"limit">>, ClientsMeta), + ClientsCount = maps:get(<<"count">>, ClientsMeta), + ?assertEqual(ClientsPage, 1), + ?assertEqual(ClientsLimit, emqx_mgmt:default_row_limit()), + ?assertEqual(ClientsCount, 3), + + %% kickout clients + KickoutPath = emqx_mgmt_api_test_util:api_path(["clients", "kickout", "bulk"]), + KickoutBody = [ClientId1, ClientId2, ClientId3], + {ok, _} = emqx_mgmt_api_test_util:request_api_with_body(post, KickoutPath, KickoutBody), + + {ok, Clients2} = emqx_mgmt_api_test_util:request_api(get, ClientsPath), + ClientsResponse2 = emqx_utils_json:decode(Clients2, [return_maps]), + ?assertMatch(#{<<"meta">> := #{<<"count">> := 0}}, ClientsResponse2). + t_query_clients_with_time(_) -> process_flag(trap_exit, true), diff --git a/rel/i18n/emqx_mgmt_api_clients.hocon b/rel/i18n/emqx_mgmt_api_clients.hocon index d82642b1e..64d4e5279 100644 --- a/rel/i18n/emqx_mgmt_api_clients.hocon +++ b/rel/i18n/emqx_mgmt_api_clients.hocon @@ -5,6 +5,11 @@ list_clients.desc: list_clients.label: """List clients""" +kickout_clients.desc: +"""Kick out a batch of client by client IDs""" +kickout_clients.label: +"""Kick out a batch of client by client IDs""" + clients_info_from_id.desc: """Get clients info by client ID""" clients_info_from_id.label: