diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index e13f60654..6ef46f477 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -15,7 +15,9 @@ {emqx_conf,3}. {emqx_dashboard,1}. {emqx_delayed,1}. +{emqx_delayed,2}. {emqx_eviction_agent,1}. +{emqx_eviction_agent,2}. {emqx_exhook,1}. {emqx_ft_storage_exporter_fs,1}. {emqx_ft_storage_fs,1}. @@ -37,9 +39,13 @@ {emqx_mgmt_trace,1}. {emqx_mgmt_trace,2}. {emqx_node_rebalance,1}. +{emqx_node_rebalance,2}. {emqx_node_rebalance_api,1}. +{emqx_node_rebalance_api,2}. {emqx_node_rebalance_evacuation,1}. +{emqx_node_rebalance_purge,1}. {emqx_node_rebalance_status,1}. +{emqx_node_rebalance_status,2}. {emqx_persistent_session,1}. {emqx_plugins,1}. {emqx_prometheus,1}. diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index a78ba752e..e3743486c 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -75,6 +75,7 @@ %% Client management -export([ + all_channels_table/1, channel_with_session_table/1, live_connection_table/1 ]). @@ -582,6 +583,26 @@ channel_with_session_table(ConnModuleList) -> sets:is_element(ConnModule, ConnModules) ]). +%% @doc Get clientinfo for all clients, regardless if they use clean start or not. +all_channels_table(ConnModuleList) -> + Ms = ets:fun2ms( + fun({{ClientId, _ChanPid}, Info, _Stats}) -> + {ClientId, Info} + end + ), + Table = ets:table(?CHAN_INFO_TAB, [{traverse, {select, Ms}}]), + ConnModules = sets:from_list(ConnModuleList, [{version, 2}]), + qlc:q([ + {ClientId, ConnState, ConnInfo, ClientInfo} + || {ClientId, #{ + conn_state := ConnState, + clientinfo := ClientInfo, + conninfo := #{conn_mod := ConnModule} = ConnInfo + }} <- + Table, + sets:is_element(ConnModule, ConnModules) + ]). + %% @doc Get all local connection query handle live_connection_table(ConnModules) -> Ms = lists:map(fun live_connection_ms/1, ConnModules), diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src b/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src index f9f6334c3..c11f52fe7 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src @@ -1,6 +1,6 @@ {application, emqx_eviction_agent, [ {description, "EMQX Eviction Agent"}, - {vsn, "5.1.0"}, + {vsn, "5.1.1"}, {registered, [ emqx_eviction_agent_sup, emqx_eviction_agent, diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl b/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl index 9a29adc69..ab2b9e66a 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl @@ -18,14 +18,19 @@ disable/1, status/0, connection_count/0, + all_channels_count/0, session_count/0, session_count/1, evict_connections/1, evict_sessions/2, evict_sessions/3, + purge_sessions/1, evict_session_channel/3 ]). +%% RPC targets +-export([all_local_channels_count/0]). + -behaviour(gen_server). -export([ @@ -113,6 +118,14 @@ evict_sessions(N, Nodes, ConnState) when {error, disabled} end. +purge_sessions(N) -> + case enable_status() of + {enabled, _Kind, _ServerReference} -> + ok = do_purge_sessions(N); + disabled -> + {error, disabled} + end. + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -231,6 +244,33 @@ channel_with_session_table(RequiredConnState) -> RequiredConnState =:= ConnState ]). +-spec all_channels_count() -> non_neg_integer(). +all_channels_count() -> + Nodes = emqx:running_nodes(), + Timeout = 15_000, + Results = emqx_eviction_agent_proto_v2:all_channels_count(Nodes, Timeout), + NodeResults = lists:zip(Nodes, Results), + Errors = lists:filter( + fun + ({_Node, {ok, _}}) -> false; + ({_Node, _Err}) -> true + end, + NodeResults + ), + Errors =/= [] andalso + ?SLOG( + warning, + #{ + msg => "error_collecting_all_channels_count", + errors => maps:from_list(Errors) + } + ), + lists:sum([N || {ok, N} <- Results]). + +-spec all_local_channels_count() -> non_neg_integer(). +all_local_channels_count() -> + table_count(emqx_cm:all_channels_table(?CONN_MODULES)). + session_count() -> session_count(any). @@ -247,6 +287,17 @@ take_connections(N) -> ok = qlc:delete_cursor(ChanPidCursor), ChanPids. +take_channels(N) -> + QH = qlc:q([ + {ClientId, ConnInfo, ClientInfo} + || {ClientId, _, ConnInfo, ClientInfo} <- + emqx_cm:all_channels_table(?CONN_MODULES) + ]), + ChanPidCursor = qlc:cursor(QH), + Channels = qlc:next_answers(ChanPidCursor, N), + ok = qlc:delete_cursor(ChanPidCursor), + Channels. + take_channel_with_sessions(N, ConnState) -> ChanPidCursor = qlc:cursor(channel_with_session_table(ConnState)), Channels = qlc:next_answers(ChanPidCursor, N), @@ -283,7 +334,7 @@ evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo) -> client_info => ClientInfo } ), - case emqx_eviction_agent_proto_v1:evict_session_channel(Node, ClientId, ConnInfo, ClientInfo) of + case emqx_eviction_agent_proto_v2:evict_session_channel(Node, ClientId, ConnInfo, ClientInfo) of {badrpc, Reason} -> ?SLOG( error, @@ -344,5 +395,14 @@ disconnect_channel(ChanPid, ServerReference) -> 'Server-Reference' => ServerReference }}. +do_purge_sessions(N) when N > 0 -> + Channels = take_channels(N), + ok = lists:foreach( + fun({ClientId, _ConnInfo, _ClientInfo}) -> + emqx_cm:discard_session(ClientId) + end, + Channels + ). + select_random(List) when length(List) > 0 -> lists:nth(rand:uniform(length(List)), List). diff --git a/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v2.erl b/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v2.erl new file mode 100644 index 000000000..2d204079c --- /dev/null +++ b/apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v2.erl @@ -0,0 +1,35 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_eviction_agent_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + evict_session_channel/4, + + %% Introduced in v2: + all_channels_count/2 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.2.1". + +-spec evict_session_channel( + node(), + emqx_types:clientid(), + emqx_types:conninfo(), + emqx_types:clientinfo() +) -> supervisor:startchild_err() | emqx_rpc:badrpc(). +evict_session_channel(Node, ClientId, ConnInfo, ClientInfo) -> + rpc:call(Node, emqx_eviction_agent, evict_session_channel, [ClientId, ConnInfo, ClientInfo]). + +%% Introduced in v2: +-spec all_channels_count([node()], time:time()) -> emqx_rpc:erpc_multicall(non_neg_integer()). +all_channels_count(Nodes, Timeout) -> + erpc:multicall(Nodes, emqx_eviction_agent, all_local_channels_count, [], Timeout). diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl index 7425cb145..130a2628a 100644 --- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl +++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl @@ -9,6 +9,7 @@ emqtt_connect/1, emqtt_connect/2, emqtt_connect_many/2, + emqtt_connect_many/3, stop_many/1, emqtt_try_connect/1, @@ -42,6 +43,9 @@ emqtt_connect(Opts) -> end. emqtt_connect_many(Port, Count) -> + emqtt_connect_many(Port, Count, _StartN = 1). + +emqtt_connect_many(Port, Count, StartN) -> lists:map( fun(N) -> NBin = integer_to_binary(N), @@ -49,7 +53,7 @@ emqtt_connect_many(Port, Count) -> {ok, C} = emqtt_connect([{clientid, ClientId}, {clean_start, false}, {port, Port}]), C end, - lists:seq(1, Count) + lists:seq(StartN, StartN + Count - 1) ). stop_many(Clients) -> diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 559648bdd..22d18c180 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -45,18 +45,22 @@ code_change/3 ]). -%% gen_server callbacks +%% API -export([ load/0, unload/0, load_or_unload/1, get_conf/1, update_config/1, + delayed_count/0, list/1, get_delayed_message/1, get_delayed_message/2, delete_delayed_message/1, delete_delayed_message/2, + clear_all/0, + %% rpc target + clear_all_local/0, cluster_list/1 ]). @@ -167,6 +171,9 @@ unload() -> load_or_unload(Bool) -> gen_server:call(?SERVER, {do_load_or_unload, Bool}). +-spec delayed_count() -> non_neg_integer(). +delayed_count() -> mnesia:table_info(?TAB, size). + list(Params) -> emqx_mgmt_api:paginate(?TAB, Params, ?FORMAT_FUN). @@ -243,7 +250,7 @@ get_delayed_message(Id) -> get_delayed_message(Node, Id) when Node =:= node() -> get_delayed_message(Id); get_delayed_message(Node, Id) -> - emqx_delayed_proto_v1:get_delayed_message(Node, Id). + emqx_delayed_proto_v2:get_delayed_message(Node, Id). -spec delete_delayed_message(binary()) -> with_id_return(). delete_delayed_message(Id) -> @@ -258,7 +265,19 @@ delete_delayed_message(Id) -> delete_delayed_message(Node, Id) when Node =:= node() -> delete_delayed_message(Id); delete_delayed_message(Node, Id) -> - emqx_delayed_proto_v1:delete_delayed_message(Node, Id). + emqx_delayed_proto_v2:delete_delayed_message(Node, Id). + +-spec clear_all() -> ok. +clear_all() -> + Nodes = emqx:running_nodes(), + _ = emqx_delayed_proto_v2:clear_all(Nodes), + ok. + +%% rpc target +-spec clear_all_local() -> ok. +clear_all_local() -> + _ = mria:clear_table(?TAB), + ok. update_config(Config) -> emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}). @@ -408,9 +427,6 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now -> end, do_publish(mnesia:dirty_next(?TAB, Key), Now, [Key | Acc]). --spec delayed_count() -> non_neg_integer(). -delayed_count() -> mnesia:table_info(?TAB, size). - do_load_or_unload(true, State) -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_DELAY_PUB), State; diff --git a/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl b/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl new file mode 100644 index 000000000..6d96dcc66 --- /dev/null +++ b/apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl @@ -0,0 +1,48 @@ +%%-------------------------------------------------------------------- +%%Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_delayed_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + get_delayed_message/2, + delete_delayed_message/2, + + %% Introduced in v2: + clear_all/1 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.2.1". + +-spec get_delayed_message(node(), binary()) -> + emqx_delayed:with_id_return(map()) | emqx_rpc:badrpc(). +get_delayed_message(Node, Id) -> + rpc:call(Node, emqx_delayed, get_delayed_message, [Id]). + +-spec delete_delayed_message(node(), binary()) -> emqx_delayed:with_id_return() | emqx_rpc:badrpc(). +delete_delayed_message(Node, Id) -> + rpc:call(Node, emqx_delayed, delete_delayed_message, [Id]). + +%% Introduced in v2: + +-spec clear_all([node()]) -> emqx_rpc:erpc_multicall(ok). +clear_all(Nodes) -> + erpc:multicall(Nodes, emqx_delayed, clear_all_local, []). diff --git a/apps/emqx_modules/test/emqx_delayed_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_SUITE.erl index 56eff8bfc..5085aa2da 100644 --- a/apps/emqx_modules/test/emqx_delayed_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_SUITE.erl @@ -164,15 +164,15 @@ t_cluster(_) -> ?assertMatch( {ok, _}, - emqx_delayed_proto_v1:get_delayed_message(node(), Id) + emqx_delayed_proto_v2:get_delayed_message(node(), Id) ), %% The 'local' and the 'fake-remote' values should be the same, %% however there is a race condition, so we are just assert that they are both 'ok' tuples ?assertMatch({ok, _}, emqx_delayed:get_delayed_message(Id)), - ?assertMatch({ok, _}, emqx_delayed_proto_v1:get_delayed_message(node(), Id)), + ?assertMatch({ok, _}, emqx_delayed_proto_v2:get_delayed_message(node(), Id)), - ok = emqx_delayed_proto_v1:delete_delayed_message(node(), Id), + ok = emqx_delayed_proto_v2:delete_delayed_message(node(), Id), ?assertMatch( {error, _}, diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src b/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src index edfa6574e..f6b619d39 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src @@ -1,6 +1,6 @@ {application, emqx_node_rebalance, [ {description, "EMQX Node Rebalance"}, - {vsn, "5.0.4"}, + {vsn, "5.0.5"}, {registered, [ emqx_node_rebalance_sup, emqx_node_rebalance, diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance.erl index 9d53841ed..b2044c5fa 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance.erl @@ -81,7 +81,7 @@ start_link() -> -spec available_nodes(list(node())) -> list(node()). available_nodes(Nodes) when is_list(Nodes) -> - {Available, _} = emqx_node_rebalance_proto_v1:available_nodes(Nodes), + {Available, _} = emqx_node_rebalance_proto_v2:available_nodes(Nodes), lists:filter(fun is_atom/1, Available). %%-------------------------------------------------------------------- @@ -370,7 +370,7 @@ avg(List) when length(List) >= 1 -> lists:sum(List) / length(List). multicall(Nodes, F, A) -> - case apply(emqx_node_rebalance_proto_v1, F, [Nodes | A]) of + case apply(emqx_node_rebalance_proto_v2, F, [Nodes | A]) of {Results, []} -> case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of {OkResults, []} -> diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_agent.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_agent.erl index 47708d00e..250d03d9c 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_agent.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_agent.erl @@ -14,7 +14,9 @@ -export([ start_link/0, enable/1, + enable/2, disable/1, + disable/2, status/0 ]). @@ -40,11 +42,21 @@ start_link() -> -spec enable(pid()) -> ok_or_error(already_enabled | eviction_agent_busy). enable(CoordinatorPid) -> - gen_server:call(?MODULE, {enable, CoordinatorPid}). + enable(CoordinatorPid, ?ENABLE_KIND). + +-spec enable(pid(), emqx_eviction_agent:kind()) -> + ok_or_error(already_enabled | eviction_agent_busy). +enable(CoordinatorPid, Kind) -> + gen_server:call(?MODULE, {enable, CoordinatorPid, Kind}). -spec disable(pid()) -> ok_or_error(already_disabled | invalid_coordinator). disable(CoordinatorPid) -> - gen_server:call(?MODULE, {disable, CoordinatorPid}). + disable(CoordinatorPid, ?ENABLE_KIND). + +-spec disable(pid(), emqx_eviction_agent:kind()) -> + ok_or_error(already_disabled | invalid_coordinator). +disable(CoordinatorPid, Kind) -> + gen_server:call(?MODULE, {disable, CoordinatorPid, Kind}). -spec status() -> status(). status() -> @@ -57,7 +69,7 @@ status() -> init([]) -> {ok, #{}}. -handle_call({enable, CoordinatorPid}, _From, St) -> +handle_call({enable, CoordinatorPid, Kind}, _From, St) -> case St of #{coordinator_pid := _Pid} -> {reply, {error, already_enabled}, St}; @@ -65,7 +77,7 @@ handle_call({enable, CoordinatorPid}, _From, St) -> true = link(CoordinatorPid), EvictionAgentPid = whereis(emqx_eviction_agent), true = link(EvictionAgentPid), - case emqx_eviction_agent:enable(?ENABLE_KIND, undefined) of + case emqx_eviction_agent:enable(Kind, undefined) of ok -> {reply, ok, #{ coordinator_pid => CoordinatorPid, @@ -77,13 +89,13 @@ handle_call({enable, CoordinatorPid}, _From, St) -> {reply, {error, eviction_agent_busy}, St} end end; -handle_call({disable, CoordinatorPid}, _From, St) -> +handle_call({disable, CoordinatorPid, Kind}, _From, St) -> case St of #{ coordinator_pid := CoordinatorPid, eviction_agent_pid := EvictionAgentPid } -> - _ = emqx_eviction_agent:disable(?ENABLE_KIND), + _ = emqx_eviction_agent:disable(Kind), true = unlink(EvictionAgentPid), true = unlink(CoordinatorPid), NewSt = maps:without( diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl index 430ad1e34..a9dc535ad 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl @@ -31,7 +31,9 @@ '/load_rebalance/:node/start'/2, '/load_rebalance/:node/stop'/2, '/load_rebalance/:node/evacuation/start'/2, - '/load_rebalance/:node/evacuation/stop'/2 + '/load_rebalance/:node/evacuation/stop'/2, + '/load_rebalance/:node/purge/start'/2, + '/load_rebalance/:node/purge/stop'/2 ]). %% Schema examples @@ -67,6 +69,9 @@ paths() -> "/load_rebalance/:node/stop", "/load_rebalance/:node/evacuation/start", "/load_rebalance/:node/evacuation/stop" + %% TODO: uncomment after we officially release the feature. + %% "/load_rebalance/:node/purge/start", + %% "/load_rebalance/:node/purge/stop" ]. schema("/load_rebalance/status") -> @@ -176,6 +181,42 @@ schema("/load_rebalance/:node/evacuation/stop") -> } } }. +%% TODO: uncomment after we officially release the feature. +%% schema("/load_rebalance/:node/purge/start") -> +%% #{ +%% 'operationId' => '/load_rebalance/:node/purge/start', +%% post => #{ +%% tags => [<<"load_rebalance">>], +%% summary => <<"Start purge on the whole cluster">>, +%% description => ?DESC("cluster_purge_start"), +%% parameters => [param_node()], +%% 'requestBody' => +%% emqx_dashboard_swagger:schema_with_examples( +%% ref(purge_start), +%% purge_example() +%% ), +%% responses => #{ +%% 200 => response_schema(), +%% 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>), +%% 404 => error_codes([?NOT_FOUND], <<"Not Found">>) +%% } +%% } +%% }; +%% schema("/load_rebalance/:node/purge/stop") -> +%% #{ +%% 'operationId' => '/load_rebalance/:node/purge/stop', +%% post => #{ +%% tags => [<<"load_rebalance">>], +%% summary => <<"Stop purge on the whole cluster">>, +%% description => ?DESC("cluster_purge_stop"), +%% parameters => [param_node()], +%% responses => #{ +%% 200 => response_schema(), +%% 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>), +%% 404 => error_codes([?NOT_FOUND], <<"Not Found">>) +%% } +%% } +%% }. %%-------------------------------------------------------------------- %% Handlers @@ -188,16 +229,20 @@ schema("/load_rebalance/:node/evacuation/stop") -> {rebalance, Stats} -> {200, format_status(rebalance, Stats)}; {evacuation, Stats} -> - {200, format_status(evacuation, Stats)} + {200, format_status(evacuation, Stats)}; + {purge, Stats} -> + {200, format_status(purge, Stats)} end. '/load_rebalance/global_status'(get, #{}) -> #{ evacuations := Evacuations, + purges := Purges, rebalances := Rebalances } = emqx_node_rebalance_status:global_status(), {200, #{ evacuations => format_as_map_list(Evacuations), + purges => format_as_map_list(Purges), rebalances => format_as_map_list(Rebalances) }}. @@ -214,7 +259,7 @@ schema("/load_rebalance/:node/evacuation/stop") -> Params1 = translate(rebalance_start, Params0), with_nodes_at_key(nodes, Params1, fun(Params2) -> wrap_rpc( - Node, emqx_node_rebalance_api_proto_v1:node_rebalance_start(Node, Params2) + Node, emqx_node_rebalance_api_proto_v2:node_rebalance_start(Node, Params2) ) end) end). @@ -222,7 +267,7 @@ schema("/load_rebalance/:node/evacuation/stop") -> '/load_rebalance/:node/stop'(post, #{bindings := #{node := NodeBin}}) -> emqx_utils_api:with_node(NodeBin, fun(Node) -> wrap_rpc( - Node, emqx_node_rebalance_api_proto_v1:node_rebalance_stop(Node) + Node, emqx_node_rebalance_api_proto_v2:node_rebalance_stop(Node) ) end). @@ -234,7 +279,7 @@ schema("/load_rebalance/:node/evacuation/stop") -> with_nodes_at_key(migrate_to, Params1, fun(Params2) -> wrap_rpc( Node, - emqx_node_rebalance_api_proto_v1:node_rebalance_evacuation_start( + emqx_node_rebalance_api_proto_v2:node_rebalance_evacuation_start( Node, Params2 ) ) @@ -244,7 +289,27 @@ schema("/load_rebalance/:node/evacuation/stop") -> '/load_rebalance/:node/evacuation/stop'(post, #{bindings := #{node := NodeBin}}) -> emqx_utils_api:with_node(NodeBin, fun(Node) -> wrap_rpc( - Node, emqx_node_rebalance_api_proto_v1:node_rebalance_evacuation_stop(Node) + Node, emqx_node_rebalance_api_proto_v2:node_rebalance_evacuation_stop(Node) + ) + end). + +'/load_rebalance/:node/purge/start'(post, #{ + bindings := #{node := NodeBin}, body := Params0 +}) -> + emqx_utils_api:with_node(NodeBin, fun(Node) -> + Params1 = translate(purge_start, Params0), + wrap_rpc( + Node, + emqx_node_rebalance_api_proto_v2:node_rebalance_purge_start( + Node, Params1 + ) + ) + end). + +'/load_rebalance/:node/purge/stop'(post, #{bindings := #{node := NodeBin}}) -> + emqx_utils_api:with_node(NodeBin, fun(Node) -> + wrap_rpc( + Node, emqx_node_rebalance_api_proto_v2:node_rebalance_purge_stop(Node) ) end). @@ -483,6 +548,17 @@ fields(rebalance_evacuation_start) -> } )} ]; +fields(purge_start) -> + [ + {"purge_rate", + mk( + pos_integer(), + #{ + desc => ?DESC(purge_rate), + required => false + } + )} + ]; fields(local_status_disabled) -> [ {"status", @@ -687,6 +763,38 @@ fields(global_evacuation_status) -> } )} ]; +fields(global_purge_status) -> + without( + [ + "status", + "process", + "connection_eviction_rate", + "session_eviction_rate", + "connection_goal", + "disconnected_session_goal", + "session_recipients", + "recipients" + ], + fields(local_status_enabled) + ) ++ + [ + {"purge_rate", + mk( + pos_integer(), + #{ + desc => ?DESC(local_status_purge_rate), + required => false + } + )}, + {"node", + mk( + binary(), + #{ + desc => ?DESC(evacuation_status_node), + required => true + } + )} + ]; fields(global_status) -> [ {"evacuations", @@ -697,6 +805,14 @@ fields(global_status) -> required => false } )}, + {"purges", + mk( + hoconsc:array(ref(global_purge_status)), + #{ + desc => ?DESC(global_status_purges), + required => false + } + )}, {"rebalances", mk( hoconsc:array(ref(global_coordinator_status)), @@ -735,6 +851,10 @@ rebalance_evacuation_example() -> } }. +%% TODO: uncomment after we officially release the feature. +%% purge_example() -> +%% #{purge => #{purge_rate => 100}}. + local_status_response_schema() -> hoconsc:union([ref(local_status_disabled), ref(local_status_enabled)]). diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_cli.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_cli.erl index 66f7a1789..9e0a173ea 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_cli.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_cli.erl @@ -29,6 +29,15 @@ cli(["start" | StartArgs]) -> emqx_ctl:print("Rebalance is already enabled~n"), false end; + {purge, Opts} -> + case emqx_node_rebalance_purge:start(Opts) of + ok -> + emqx_ctl:print("Rebalance(purge) started~n"), + true; + {error, Reason} -> + emqx_ctl:print("Rebalance(purge) start error: ~p~n", [Reason]), + false + end; {rebalance, Opts} -> case emqx_node_rebalance:start(Opts) of ok -> @@ -55,6 +64,7 @@ cli(["node-status"]) -> cli(["status"]) -> #{ evacuations := Evacuations, + purges := Purges, rebalances := Rebalances } = emqx_node_rebalance_status:global_status(), lists:foreach( @@ -69,6 +79,18 @@ cli(["status"]) -> end, Evacuations ), + lists:foreach( + fun({Node, Status}) -> + emqx_ctl:print( + "--------------------------------------------------------------------~n" + ), + emqx_ctl:print( + "Node ~p: purge~n~s", + [Node, emqx_node_rebalance_status:format_local_status(Status)] + ) + end, + Purges + ), lists:foreach( fun({Node, Status}) -> emqx_ctl:print( @@ -82,10 +104,14 @@ cli(["status"]) -> Rebalances ); cli(["stop"]) -> - case emqx_node_rebalance_evacuation:status() of - {enabled, _} -> - ok = emqx_node_rebalance_evacuation:stop(), - emqx_ctl:print("Rebalance(evacuation) stopped~n"), + Checks = + [ + {evacuation, fun emqx_node_rebalance_evacuation:status/0, + fun emqx_node_rebalance_evacuation:stop/0}, + {purge, fun emqx_node_rebalance_purge:status/0, fun emqx_node_rebalance_purge:stop/0} + ], + case do_stop(Checks) of + ok -> true; disabled -> case emqx_node_rebalance:status() of @@ -112,6 +138,13 @@ cli(_) -> "Start current node evacuation with optional server redirect to the specified servers" }, + %% TODO: uncomment after we officially release the feature. + %% { + %% "rebalance start --purge \\\n" + %% " [--purge-rate CountPerSec]", + %% "Start purge on all running nodes in the cluster" + %% }, + { "rebalance start \\\n" " [--nodes \"node1@host1 node2@host2\"] \\\n" @@ -140,7 +173,11 @@ cli(_) -> node_status(NodeStatus) -> case NodeStatus of - {Process, Status} when Process =:= evacuation orelse Process =:= rebalance -> + {Process, Status} when + Process =:= evacuation; + Process =:= purge; + Process =:= rebalance + -> emqx_ctl:print( "Rebalance type: ~p~n~s~n", [Process, emqx_node_rebalance_status:format_local_status(Status)] @@ -160,6 +197,13 @@ start_args(Args) -> {error, _} = Error -> Error end; + {ok, #{"--purge" := true} = Collected} -> + case validate_purge(maps:to_list(Collected), #{}) of + {ok, Validated} -> + {purge, Validated}; + {error, _} = Error -> + Error + end; {ok, #{} = Collected} -> case validate_rebalance(maps:to_list(Collected), #{}) of {ok, Validated} -> @@ -180,6 +224,11 @@ collect_args(["--redirect-to", ServerReference | Args], Map) -> collect_args(Args, Map#{"--redirect-to" => ServerReference}); collect_args(["--migrate-to", MigrateTo | Args], Map) -> collect_args(Args, Map#{"--migrate-to" => MigrateTo}); +%% purge +collect_args(["--purge" | Args], Map) -> + collect_args(Args, Map#{"--purge" => true}); +collect_args(["--purge-rate", PurgeRate | Args], Map) -> + collect_args(Args, Map#{"--purge-rate" => PurgeRate}); %% rebalance collect_args(["--nodes", Nodes | Args], Map) -> collect_args(Args, Map#{"--nodes" => Nodes}); @@ -239,6 +288,15 @@ validate_evacuation([{"--migrate-to", MigrateTo} | Rest], Map) -> validate_evacuation(Rest, _Map) -> {error, io_lib:format("unknown evacuation arguments: ~p", [Rest])}. +validate_purge([], Map) -> + {ok, Map}; +validate_purge([{"--purge", _} | Rest], Map) -> + validate_purge(Rest, Map); +validate_purge([{"--purge-rate", _} | _] = Opts, Map) -> + validate_pos_int(purge_rate, Opts, Map, fun validate_purge/2); +validate_purge(Rest, _Map) -> + {error, io_lib:format("unknown purge arguments: ~p", [Rest])}. + validate_rebalance([], Map) -> {ok, Map}; validate_rebalance([{"--wait-health-check", _} | _] = Opts, Map) -> @@ -306,3 +364,15 @@ strings_to_atoms([Str | Rest], Atoms, Invalid) -> {error, _} -> strings_to_atoms(Rest, Atoms, [Str | Invalid]) end. + +do_stop([{Type, Check, Stop} | Rest]) -> + case Check() of + {enabled, _} -> + ok = Stop(), + emqx_ctl:print("Rebalance(~s) stopped~n", [Type]), + ok; + disabled -> + do_stop(Rest) + end; +do_stop([]) -> + disabled. diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl new file mode 100644 index 000000000..81f1bfe03 --- /dev/null +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl @@ -0,0 +1,233 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_node_rebalance_purge). + +-include("emqx_node_rebalance.hrl"). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/types.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-export([ + start/1, + status/0, + stop/0 +]). + +-export([start_link/0]). + +-behaviour(gen_statem). + +-export([ + init/1, + callback_mode/0, + handle_event/4, + code_change/4 +]). + +-export_type([ + start_opts/0, + start_error/0, + stop_error/0 +]). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +-define(DEFAULT_PURGE_RATE, 500). +-define(ENABLE_KIND, purge). + +%% gen_statem states +-define(disabled, disabled). +-define(purging, purging). +-define(cleaning_data, cleaning_data). + +-type start_opts() :: #{ + purge_rate => pos_integer() +}. +-type start_error() :: already_started. +-type stop_error() :: not_started. +-type stats() :: #{ + initial_sessions := non_neg_integer(), + current_sessions := non_neg_integer(), + purge_rate := pos_integer() +}. +-type status() :: {enabled, stats()} | disabled. + +-spec start(start_opts()) -> ok_or_error(start_error()). +start(StartOpts) -> + Opts = maps:merge(default_opts(), StartOpts), + gen_statem:call(?MODULE, {start, Opts}). + +-spec stop() -> ok_or_error(not_started). +stop() -> + gen_statem:call(?MODULE, stop). + +-spec status() -> status(). +status() -> + gen_statem:call(?MODULE, status). + +-spec start_link() -> startlink_ret(). +start_link() -> + gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []). + +%%-------------------------------------------------------------------- +%% gen_statem callbacks +%%-------------------------------------------------------------------- + +callback_mode() -> handle_event_function. + +%% states: disabled, purging, cleaning_data + +init([]) -> + {ok, disabled, #{}}. + +%% start +handle_event({call, From}, {start, Opts}, ?disabled, #{} = Data) -> + ok = enable_purge(), + ?SLOG(warning, #{ + msg => "cluster_purge_started", + opts => Opts + }), + NewData = init_data(Data, Opts), + {next_state, ?purging, NewData, [ + {state_timeout, 0, purge}, + {reply, From, ok} + ]}; +handle_event({call, From}, {start, _Opts}, _State, #{}) -> + {keep_state_and_data, [{reply, From, {error, already_started}}]}; +%% stop +handle_event({call, From}, stop, ?disabled, #{}) -> + {keep_state_and_data, [{reply, From, {error, not_started}}]}; +handle_event({call, From}, stop, _State, Data) -> + ok = disable_purge(), + ?SLOG(warning, #{msg => "cluster_purge_stopped"}), + {next_state, disabled, deinit(Data), [{reply, From, ok}]}; +%% status +handle_event({call, From}, status, ?disabled, #{}) -> + {keep_state_and_data, [{reply, From, disabled}]}; +handle_event({call, From}, status, State, Data) -> + Stats = maps:with( + [ + initial_sessions, + current_sessions, + purge_rate + ], + Data + ), + {keep_state_and_data, [ + {reply, From, {enabled, Stats#{state => State}}} + ]}; +%% session purge +handle_event( + state_timeout, + purge, + ?purging, + #{ + purge_rate := PurgeRate + } = Data +) -> + case emqx_eviction_agent:all_channels_count() of + Sessions when Sessions > 0 -> + ok = purge_sessions(PurgeRate), + ?tp( + warning, + "cluster_purge_evict_sessions", + #{ + count => Sessions, + purge_rate => PurgeRate + } + ), + NewData = Data#{current_sessions => Sessions}, + {keep_state, NewData, [{state_timeout, ?EVICT_INTERVAL, purge}]}; + _Sessions = 0 -> + NewData = Data#{current_conns => 0}, + ?SLOG(warning, #{msg => "cluster_purge_evict_sessions_done"}), + {next_state, ?cleaning_data, NewData, [ + {state_timeout, 0, clean_retained_messages} + ]} + end; +handle_event( + state_timeout, + clean_retained_messages, + ?cleaning_data, + Data +) -> + ?SLOG(warning, #{msg => "cluster_purge_cleaning_data"}), + ok = emqx_retainer:clean(), + ok = emqx_delayed:clear_all(), + ?tp(warning, "cluster_purge_done", #{}), + ok = disable_purge(), + ?tp(warning, "cluster_purge_finished_successfully", #{}), + {next_state, ?disabled, deinit(Data)}; +handle_event({call, From}, Msg, State, Data) -> + ?SLOG(warning, #{msg => "unknown_call", call => Msg, state => State, data => Data}), + {keep_state_and_data, [{reply, From, ignored}]}; +handle_event(info, Msg, State, Data) -> + ?SLOG(warning, #{msg => "unknown_info", info => Msg, state => State, data => Data}), + keep_state_and_data; +handle_event(cast, Msg, State, Data) -> + ?SLOG(warning, #{msg => "unknown_cast", cast => Msg, state => State, data => Data}), + keep_state_and_data. + +code_change(_Vsn, State, Data, _Extra) -> + {ok, State, Data}. + +%%-------------------------------------------------------------------- +%% internal funs +%%-------------------------------------------------------------------- + +default_opts() -> + #{ + purge_rate => ?DEFAULT_PURGE_RATE + }. + +init_data(Data0, Opts) -> + Data1 = maps:merge(Data0, Opts), + SessCount = emqx_eviction_agent:session_count(), + Data1#{ + initial_sessions => SessCount, + current_sessions => SessCount + }. + +deinit(Data) -> + Keys = + [initial_sessions, current_sessions | maps:keys(default_opts())], + maps:without(Keys, Data). + +multicall(Nodes, F, A) -> + case apply(emqx_node_rebalance_proto_v2, F, [Nodes | A]) of + {Results, []} -> + case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of + {_OkResults, []} -> + ok; + {_, BadResults} -> + %% we crash on errors so that the coordinator death is signalled to + %% the eviction agents in the cluster. + error({bad_nodes, BadResults}) + end; + {_, [_BadNode | _] = BadNodes} -> + error({bad_nodes, BadNodes}) + end. + +is_ok({_Node, {ok, _}}) -> true; +is_ok({_Node, ok}) -> true; +is_ok(_) -> false. + +enable_purge() -> + Nodes = emqx:running_nodes(), + _ = multicall(Nodes, enable_rebalance_agent, [self(), ?ENABLE_KIND]), + ok. + +disable_purge() -> + Nodes = emqx:running_nodes(), + _ = multicall(Nodes, disable_rebalance_agent, [self(), ?ENABLE_KIND]), + ok. + +purge_sessions(PurgeRate) -> + Nodes = emqx:running_nodes(), + _ = multicall(Nodes, purge_sessions, [PurgeRate]), + ok. diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_status.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_status.erl index a0102c4f4..dbeb4d97f 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_status.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_status.erl @@ -15,6 +15,7 @@ %% For RPC -export([ evacuation_status/0, + purge_status/0, rebalance_status/0 ]). @@ -22,11 +23,13 @@ %% APIs %%-------------------------------------------------------------------- --spec local_status() -> disabled | {evacuation, map()} | {rebalance, map()}. +-spec local_status() -> disabled | {evacuation, map()} | {purge, map()} | {rebalance, map()}. local_status() -> - case emqx_node_rebalance_evacuation:status() of - {enabled, Status} -> - {evacuation, evacuation(Status)}; + Checks = [ + {evacuation, fun emqx_node_rebalance_evacuation:status/0, fun evacuation/1}, + {purge, fun emqx_node_rebalance_purge:status/0, fun purge/1} + ], + case do_local_status(Checks) of disabled -> case emqx_node_rebalance_agent:status() of {enabled, CoordinatorPid} -> @@ -38,28 +41,37 @@ local_status() -> end; disabled -> disabled - end + end; + Res -> + Res end. --spec local_status(node()) -> disabled | {evacuation, map()} | {rebalance, map()}. +-spec local_status(node()) -> disabled | {evacuation, map()} | {purge, map()} | {rebalance, map()}. local_status(Node) -> - emqx_node_rebalance_status_proto_v1:local_status(Node). + emqx_node_rebalance_status_proto_v2:local_status(Node). -spec format_local_status(map()) -> iodata(). format_local_status(Status) -> format_status(Status, local_status_field_format_order()). --spec global_status() -> #{rebalances := [{node(), map()}], evacuations := [{node(), map()}]}. +-spec global_status() -> + #{ + rebalances := [{node(), map()}], + evacuations := [{node(), map()}], + purges := [{node(), map()}] + }. global_status() -> Nodes = emqx:running_nodes(), - {RebalanceResults, _} = emqx_node_rebalance_status_proto_v1:rebalance_status(Nodes), + {RebalanceResults, _} = emqx_node_rebalance_status_proto_v2:rebalance_status(Nodes), Rebalances = [ {Node, coordinator_rebalance(Status)} || {Node, {enabled, Status}} <- RebalanceResults ], - {EvacuatioResults, _} = emqx_node_rebalance_status_proto_v1:evacuation_status(Nodes), - Evacuations = [{Node, evacuation(Status)} || {Node, {enabled, Status}} <- EvacuatioResults], - #{rebalances => Rebalances, evacuations => Evacuations}. + {EvacuationResults, _} = emqx_node_rebalance_status_proto_v2:evacuation_status(Nodes), + Evacuations = [{Node, evacuation(Status)} || {Node, {enabled, Status}} <- EvacuationResults], + {PurgeResults, _} = emqx_node_rebalance_status_proto_v2:purge_status(Nodes), + Purges = [{Node, purge(Status)} || {Node, {enabled, Status}} <- PurgeResults], + #{rebalances => Rebalances, evacuations => Evacuations, purges => Purges}. -spec format_coordinator_status(map()) -> iodata(). format_coordinator_status(Status) -> @@ -85,6 +97,17 @@ evacuation(Status) -> } }. +purge(Status) -> + #{ + state => maps:get(state, Status), + purge_rate => maps:get(purge_rate, Status), + session_goal => 0, + stats => #{ + initial_sessions => maps:get(initial_sessions, Status), + current_sessions => maps:get(current_sessions, Status) + } + }. + local_rebalance(#{donors := Donors} = Stats, Node) -> case lists:member(Node, Donors) of true -> {rebalance, donor_rebalance(Stats, Node)}; @@ -159,6 +182,7 @@ local_status_field_format_order() -> coordinator_node, connection_eviction_rate, session_eviction_rate, + purge_rate, connection_goal, session_goal, disconnected_session_goal, @@ -201,6 +225,8 @@ format_local_status_field({connection_eviction_rate, ConnEvictRate}) -> io_lib:format("Connection eviction rate: ~p connections/second~n", [ConnEvictRate]); format_local_status_field({session_eviction_rate, SessEvictRate}) -> io_lib:format("Session eviction rate: ~p sessions/second~n", [SessEvictRate]); +format_local_status_field({purge_rate, PurgeRate}) -> + io_lib:format("Purge rate: ~p sessions/second~n", [PurgeRate]); format_local_status_field({connection_goal, ConnGoal}) -> io_lib:format("Connection goal: ~p~n", [ConnGoal]); format_local_status_field({session_goal, SessGoal}) -> @@ -231,8 +257,21 @@ format_local_stats(Stats) -> ) ]. +do_local_status([{Type, Get, Cont} | Rest]) -> + case Get() of + disabled -> + do_local_status(Rest); + {enabled, Status} -> + {Type, Cont(Status)} + end; +do_local_status([]) -> + disabled. + evacuation_status() -> {node(), emqx_node_rebalance_evacuation:status()}. +purge_status() -> + {node(), emqx_node_rebalance_purge:status()}. + rebalance_status() -> {node(), emqx_node_rebalance:status()}. diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_sup.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_sup.erl index cfaccc4c2..8ec16c1e7 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_sup.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_sup.erl @@ -15,6 +15,7 @@ start_link() -> init([]) -> Childs = [ + child_spec(emqx_node_rebalance_purge, []), child_spec(emqx_node_rebalance_evacuation, []), child_spec(emqx_node_rebalance_agent, []), child_spec(emqx_node_rebalance, []) diff --git a/apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_api_proto_v2.erl b/apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_api_proto_v2.erl new file mode 100644 index 000000000..2b5b4bca3 --- /dev/null +++ b/apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_api_proto_v2.erl @@ -0,0 +1,59 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_node_rebalance_api_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + node_rebalance_evacuation_start/2, + node_rebalance_evacuation_stop/1, + + node_rebalance_start/2, + node_rebalance_stop/1, + + %% Introduced in v2: + node_rebalance_purge_start/2, + node_rebalance_purge_stop/1 +]). + +-include_lib("emqx/include/bpapi.hrl"). +-include_lib("emqx/include/types.hrl"). + +introduced_in() -> + "5.2.1". + +-spec node_rebalance_evacuation_start(node(), emqx_node_rebalance_evacuation:start_opts()) -> + emqx_rpc:badrpc() | ok_or_error(emqx_node_rebalance_evacuation:start_error()). +node_rebalance_evacuation_start(Node, #{} = Opts) -> + rpc:call(Node, emqx_node_rebalance_evacuation, start, [Opts]). + +-spec node_rebalance_evacuation_stop(node()) -> + emqx_rpc:badrpc() | ok_or_error(not_started). +node_rebalance_evacuation_stop(Node) -> + rpc:call(Node, emqx_node_rebalance_evacuation, stop, []). + +-spec node_rebalance_start(node(), emqx_node_rebalance:start_opts()) -> + emqx_rpc:badrpc() | ok_or_error(emqx_node_rebalance:start_error()). +node_rebalance_start(Node, Opts) -> + rpc:call(Node, emqx_node_rebalance, start, [Opts]). + +-spec node_rebalance_stop(node()) -> + emqx_rpc:badrpc() | ok_or_error(not_started). +node_rebalance_stop(Node) -> + rpc:call(Node, emqx_node_rebalance, stop, []). + +%% Introduced in v2: + +-spec node_rebalance_purge_start(node(), emqx_node_rebalance_purge:start_opts()) -> + emqx_rpc:badrpc() | ok_or_error(emqx_node_rebalance_purge:start_error()). +node_rebalance_purge_start(Node, #{} = Opts) -> + rpc:call(Node, emqx_node_rebalance_purge, start, [Opts]). + +-spec node_rebalance_purge_stop(node()) -> + emqx_rpc:badrpc() | ok_or_error(emqx_node_rebalance_purge:stop_error()). +node_rebalance_purge_stop(Node) -> + rpc:call(Node, emqx_node_rebalance_purge, stop, []). diff --git a/apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_proto_v2.erl b/apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_proto_v2.erl new file mode 100644 index 000000000..ca8233288 --- /dev/null +++ b/apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_proto_v2.erl @@ -0,0 +1,84 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_node_rebalance_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + available_nodes/1, + evict_connections/2, + evict_sessions/4, + connection_counts/1, + session_counts/1, + enable_rebalance_agent/2, + disable_rebalance_agent/2, + disconnected_session_counts/1, + + %% Introduced in v2: + enable_rebalance_agent/3, + disable_rebalance_agent/3, + purge_sessions/2 +]). + +-include_lib("emqx/include/bpapi.hrl"). +-include_lib("emqx/include/types.hrl"). + +introduced_in() -> + "5.2.1". + +-spec available_nodes([node()]) -> emqx_rpc:multicall_result(node()). +available_nodes(Nodes) -> + rpc:multicall(Nodes, emqx_node_rebalance, is_node_available, []). + +-spec evict_connections([node()], non_neg_integer()) -> + emqx_rpc:multicall_result(ok_or_error(disabled)). +evict_connections(Nodes, Count) -> + rpc:multicall(Nodes, emqx_eviction_agent, evict_connections, [Count]). + +-spec evict_sessions([node()], non_neg_integer(), [node()], emqx_channel:conn_state()) -> + emqx_rpc:multicall_result(ok_or_error(disabled)). +evict_sessions(Nodes, Count, RecipientNodes, ConnState) -> + rpc:multicall(Nodes, emqx_eviction_agent, evict_sessions, [Count, RecipientNodes, ConnState]). + +-spec connection_counts([node()]) -> emqx_rpc:multicall_result({ok, non_neg_integer()}). +connection_counts(Nodes) -> + rpc:multicall(Nodes, emqx_node_rebalance, connection_count, []). + +-spec session_counts([node()]) -> emqx_rpc:multicall_result({ok, non_neg_integer()}). +session_counts(Nodes) -> + rpc:multicall(Nodes, emqx_node_rebalance, session_count, []). + +-spec enable_rebalance_agent([node()], pid()) -> + emqx_rpc:multicall_result(ok_or_error(already_enabled | eviction_agent_busy)). +enable_rebalance_agent(Nodes, OwnerPid) -> + rpc:multicall(Nodes, emqx_node_rebalance_agent, enable, [OwnerPid]). + +-spec disable_rebalance_agent([node()], pid()) -> + emqx_rpc:multicall_result(ok_or_error(already_disabled | invalid_coordinator)). +disable_rebalance_agent(Nodes, OwnerPid) -> + rpc:multicall(Nodes, emqx_node_rebalance_agent, disable, [OwnerPid]). + +-spec disconnected_session_counts([node()]) -> emqx_rpc:multicall_result({ok, non_neg_integer()}). +disconnected_session_counts(Nodes) -> + rpc:multicall(Nodes, emqx_node_rebalance, disconnected_session_count, []). + +%% Introduced in v2: + +-spec enable_rebalance_agent([node()], pid(), emqx_eviction_agent:kind()) -> + emqx_rpc:multicall_result(ok_or_error(already_enabled | eviction_agent_busy)). +enable_rebalance_agent(Nodes, OwnerPid, Kind) -> + rpc:multicall(Nodes, emqx_node_rebalance_agent, enable, [OwnerPid, Kind]). + +-spec disable_rebalance_agent([node()], pid(), emqx_eviction_agent:kind()) -> + emqx_rpc:multicall_result(ok_or_error(already_disabled | invalid_coordinator)). +disable_rebalance_agent(Nodes, OwnerPid, Kind) -> + rpc:multicall(Nodes, emqx_node_rebalance_agent, disable, [OwnerPid, Kind]). + +-spec purge_sessions([node()], non_neg_integer()) -> + emqx_rpc:multicall_result(ok_or_error(disabled)). +purge_sessions(Nodes, Count) -> + rpc:multicall(Nodes, emqx_eviction_agent, purge_sessions, [Count]). diff --git a/apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_purge_proto_v1.erl b/apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_purge_proto_v1.erl new file mode 100644 index 000000000..8b2b63bc4 --- /dev/null +++ b/apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_purge_proto_v1.erl @@ -0,0 +1,29 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_node_rebalance_purge_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + start/2, + stop/1 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.2.1". + +-spec start([node()], emqx_node_rebalance_purge:start_opts()) -> + emqx_rpc:erpc_multicall(ok | {error, emqx_node_rebalance_purge:start_error()}). +start(Nodes, Opts) -> + erpc:multicall(Nodes, emqx_node_rebalance_purge, start, [Opts]). + +-spec stop([node()]) -> + emqx_rpc:erpc_multicall(ok | {error, emqx_node_rebalance_purge:stop_error()}). +stop(Nodes) -> + erpc:multicall(Nodes, emqx_node_rebalance_purge, stop, []). diff --git a/apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_status_proto_v2.erl b/apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_status_proto_v2.erl new file mode 100644 index 000000000..af8981b59 --- /dev/null +++ b/apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_status_proto_v2.erl @@ -0,0 +1,46 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_node_rebalance_status_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + local_status/1, + rebalance_status/1, + evacuation_status/1, + + %% Introduced in v2: + purge_status/1 +]). + +-include_lib("emqx/include/bpapi.hrl"). +-include_lib("emqx/include/types.hrl"). + +introduced_in() -> + "5.2.1". + +-spec local_status(node()) -> + emqx_rpc:badrpc() | disabled | {evacuation, map()} | {rebalance, map()}. +local_status(Node) -> + rpc:call(Node, emqx_node_rebalance_status, local_status, []). + +-spec rebalance_status([node()]) -> + emqx_rpc:multicall_result({node(), map()}). +rebalance_status(Nodes) -> + rpc:multicall(Nodes, emqx_node_rebalance_status, rebalance_status, []). + +-spec evacuation_status([node()]) -> + emqx_rpc:multicall_result({node(), map()}). +evacuation_status(Nodes) -> + rpc:multicall(Nodes, emqx_node_rebalance_status, evacuation_status, []). + +%% Introduced in v2: + +-spec purge_status([node()]) -> + emqx_rpc:multicall_result({node(), map()}). +purge_status(Nodes) -> + rpc:multicall(Nodes, emqx_node_rebalance_status, purge_status, []). diff --git a/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl b/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl index 188e6bf71..017e85971 100644 --- a/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl +++ b/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl @@ -38,32 +38,35 @@ end_per_suite(_Config) -> ok. init_per_testcase(Case, Config) -> - [{DonorNode, _} | _] = - ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster( - [ - {case_specific_node_name(?MODULE, Case, '_donor'), 2883}, - {case_specific_node_name(?MODULE, Case, '_recipient'), 3883} - ], - ?START_APPS, - [{emqx, data_dir, case_specific_data_dir(Case, Config)}] + DonorNode = case_specific_node_name(?MODULE, Case, '_donor'), + RecipientNode = case_specific_node_name(?MODULE, Case, '_recipient'), + Spec = #{ + role => core, + join_to => emqx_cth_cluster:node_name(DonorNode), + listeners => true, + apps => app_specs() + }, + Cluster = [{Node, Spec} || Node <- [DonorNode, RecipientNode]], + ClusterNodes = + [Node1 | _] = emqx_cth_cluster:start( + Cluster, + #{work_dir => ?config(priv_dir, Config)} ), - - ok = rpc:call(DonorNode, emqx_mgmt_api_test_util, init_suite, []), - ok = take_auth_header_from(DonorNode), - + ok = rpc:call(Node1, emqx_mgmt_api_test_util, init_suite, []), + ok = take_auth_header_from(Node1), [{cluster_nodes, ClusterNodes} | Config]. end_per_testcase(_Case, Config) -> - _ = emqx_eviction_agent_test_helpers:stop_cluster( - ?config(cluster_nodes, Config), - ?START_APPS - ). + Nodes = ?config(cluster_nodes, Config), + erpc:multicall(Nodes, meck, unload, []), + _ = emqx_cth_cluster:stop(Nodes), + ok. %%-------------------------------------------------------------------- %% Tests %%-------------------------------------------------------------------- t_start_evacuation_validation(Config) -> - [{DonorNode, _}, {RecipientNode, _}] = ?config(cluster_nodes, Config), + [DonorNode, RecipientNode] = ?config(cluster_nodes, Config), BadOpts = [ #{conn_evict_rate => <<"conn">>}, #{sess_evict_rate => <<"sess">>}, @@ -117,10 +120,87 @@ t_start_evacuation_validation(Config) -> api_get(["load_rebalance", "global_status"]) ). +%% TODO: uncomment after we officially release the feature. +skipped_t_start_purge_validation(Config) -> + [Node1 | _] = ?config(cluster_nodes, Config), + Port1 = get_mqtt_port(Node1, tcp), + BadOpts = [ + #{purge_rate => <<"conn">>}, + #{purge_rate => 0}, + #{purge_rate => -1}, + #{purge_rate => 1.1}, + #{unknown => <<"Value">>} + ], + lists:foreach( + fun(Opts) -> + ?assertMatch( + {ok, 400, #{}}, + api_post( + ["load_rebalance", atom_to_list(Node1), "purge", "start"], + Opts + ), + Opts + ) + end, + BadOpts + ), + ?assertMatch( + {ok, 404, #{}}, + api_post( + ["load_rebalance", "bad@node", "purge", "start"], + #{} + ) + ), + + process_flag(trap_exit, true), + Conns = emqtt_connect_many(Port1, 100), + + ?assertMatch( + {ok, 200, #{}}, + api_post( + ["load_rebalance", atom_to_list(Node1), "purge", "start"], + #{purge_rate => 10} + ) + ), + + Node1Bin = atom_to_binary(Node1), + ?assertMatch( + {ok, 200, #{<<"purges">> := [#{<<"node">> := Node1Bin}]}}, + api_get(["load_rebalance", "global_status"]) + ), + + ?assertMatch( + {ok, 200, #{ + <<"process">> := <<"purge">>, + <<"purge_rate">> := 10, + <<"session_goal">> := 0, + <<"state">> := <<"purging">>, + <<"stats">> := + #{ + <<"current_sessions">> := _, + <<"initial_sessions">> := 100 + } + }}, + api_get(["load_rebalance", "status"]) + ), + + ?assertMatch( + {ok, 200, #{}}, + api_post( + ["load_rebalance", atom_to_list(Node1), "purge", "stop"], + #{} + ) + ), + + ok = stop_many(Conns), + + ok. + t_start_rebalance_validation(Config) -> process_flag(trap_exit, true), - [{DonorNode, DonorPort}, {RecipientNode, _}] = ?config(cluster_nodes, Config), + [DonorNode, RecipientNode] = ?config(cluster_nodes, Config), + DonorPort = get_mqtt_port(DonorNode, tcp), BadOpts = [ #{conn_evict_rate => <<"conn">>}, @@ -189,7 +269,7 @@ t_start_rebalance_validation(Config) -> ok = stop_many(Conns). t_start_stop_evacuation(Config) -> - [{DonorNode, _}, {RecipientNode, _}] = ?config(cluster_nodes, Config), + [DonorNode, RecipientNode] = ?config(cluster_nodes, Config), StartOpts = maps:merge( maps:get(evacuation, emqx_node_rebalance_api:rebalance_evacuation_example()), @@ -284,7 +364,8 @@ t_start_stop_evacuation(Config) -> t_start_stop_rebalance(Config) -> process_flag(trap_exit, true), - [{DonorNode, DonorPort}, {RecipientNode, _}] = ?config(cluster_nodes, Config), + [DonorNode, RecipientNode] = ?config(cluster_nodes, Config), + DonorPort = get_mqtt_port(DonorNode, tcp), ?assertMatch( {ok, 200, #{<<"status">> := <<"disabled">>}}, @@ -390,7 +471,7 @@ t_start_stop_rebalance(Config) -> ok = stop_many(Conns). t_availability_check(Config) -> - [{DonorNode, _} | _] = ?config(cluster_nodes, Config), + [DonorNode | _] = ?config(cluster_nodes, Config), ?assertMatch( {ok, 200, #{}}, api_get(["load_rebalance", "availability_check"]) @@ -425,7 +506,12 @@ api_get(Path) -> api_post(Path, Data) -> case request(post, uri(Path), Data) of {ok, Code, ResponseBody} -> - {ok, Code, jiffy:decode(ResponseBody, [return_maps])}; + Res = + case emqx_utils_json:safe_decode(ResponseBody, [return_maps]) of + {ok, Decoded} -> Decoded; + {error, _} -> ResponseBody + end, + {ok, Code, Res}; {error, _} = Error -> Error end. @@ -444,3 +530,26 @@ case_specific_data_dir(Case, Config) -> undefined -> undefined; PrivDir -> filename:join(PrivDir, atom_to_list(Case)) end. + +app_specs() -> + [ + {emqx, #{ + before_start => fun() -> + emqx_app:set_config_loader(?MODULE) + end, + override_env => [{boot_modules, [broker, listeners]}] + }}, + {emqx_retainer, #{ + config => + #{ + retainer => + #{enable => true} + } + }}, + emqx_eviction_agent, + emqx_node_rebalance + ]. + +get_mqtt_port(Node, Type) -> + {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), + Port. diff --git a/apps/emqx_node_rebalance/test/emqx_node_rebalance_cli_SUITE.erl b/apps/emqx_node_rebalance/test/emqx_node_rebalance_cli_SUITE.erl index 54ecad026..7d0cab0ce 100644 --- a/apps/emqx_node_rebalance/test/emqx_node_rebalance_cli_SUITE.erl +++ b/apps/emqx_node_rebalance/test/emqx_node_rebalance_cli_SUITE.erl @@ -156,6 +156,80 @@ t_evacuation(_Config) -> emqx_node_rebalance_evacuation:status() ). +t_purge(_Config) -> + %% start with invalid args + ?assertNot( + emqx_node_rebalance_cli:cli(["start", "--purge", "--foo-bar"]) + ), + + ?assertNot( + emqx_node_rebalance_cli:cli(["start", "--purge", "--purge-rate", "foobar"]) + ), + + %% not used by this scenario + ?assertNot( + emqx_node_rebalance_cli:cli(["start", "--purge", "--conn-evict-rate", "1"]) + ), + + ?assertNot( + emqx_node_rebalance_cli:cli(["start", "--purge", "--sess-evict-rate", "1"]) + ), + + ?assertNot( + emqx_node_rebalance_cli:cli(["start", "--purge", "--wait-takeover", "1"]) + ), + + ?assertNot( + emqx_node_rebalance_cli:cli([ + "start", + "--purge", + "--migrate-to", + atom_to_list(node()) + ]) + ), + with_some_sessions(fun() -> + ?assert( + emqx_node_rebalance_cli:cli([ + "start", + "--purge", + "--purge-rate", + "10" + ]) + ), + + %% status + ok = emqx_node_rebalance_cli:cli(["status"]), + ok = emqx_node_rebalance_cli:cli(["node-status"]), + ok = emqx_node_rebalance_cli:cli(["node-status", atom_to_list(node())]), + + ?assertMatch( + {enabled, #{}}, + emqx_node_rebalance_purge:status() + ), + + %% already enabled + ?assertNot( + emqx_node_rebalance_cli:cli([ + "start", + "--purge", + "--purge-rate", + "10" + ]) + ), + true = emqx_node_rebalance_cli:cli(["stop"]), + ok + end), + %% stop + + false = emqx_node_rebalance_cli:cli(["stop"]), + + ?assertEqual( + disabled, + emqx_node_rebalance_purge:status() + ), + + ok. + t_rebalance(Config) -> process_flag(trap_exit, true), @@ -289,3 +363,12 @@ emqx_node_rebalance_cli(Node, Args) -> Result -> Result end. + +%% to avoid it finishing too fast +with_some_sessions(Fn) -> + emqx_common_test_helpers:with_mock( + emqx_eviction_agent, + all_channels_count, + fun() -> 100 end, + Fn + ). diff --git a/apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl b/apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl new file mode 100644 index 000000000..7cdcc4d71 --- /dev/null +++ b/apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl @@ -0,0 +1,360 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_node_rebalance_purge_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/asserts.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-import( + emqx_eviction_agent_test_helpers, + [ + emqtt_connect/1, + emqtt_try_connect/1, + case_specific_node_name/3 + ] +). + +all() -> + [{group, one_node}, {group, two_nodes}]. + +groups() -> + [ + {one_node, [], one_node_cases()}, + {two_nodes, [], two_nodes_cases()} + ]. + +two_nodes_cases() -> + [ + t_already_started_two, + t_session_purged + ]. + +one_node_cases() -> + emqx_common_test_helpers:all(?MODULE) -- two_nodes_cases(). + +init_per_suite(Config) -> + ok = emqx_common_test_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + ok = emqx_common_test_helpers:stop_apps([]), + ok. + +init_per_group(one_node, Config) -> + [{cluster_type, one_node} | Config]; +init_per_group(two_nodes, Config) -> + [{cluster_type, two_nodes} | Config]. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(TestCase, Config) -> + ct:timetrap({seconds, 30}), + Nodes = + [Node1 | _] = + case ?config(cluster_type, Config) of + one_node -> + [case_specific_node_name(?MODULE, TestCase, '_1')]; + two_nodes -> + [ + case_specific_node_name(?MODULE, TestCase, '_1'), + case_specific_node_name(?MODULE, TestCase, '_2') + ] + end, + Spec = #{ + role => core, + join_to => emqx_cth_cluster:node_name(Node1), + listeners => true, + apps => app_specs() + }, + Cluster = [{Node, Spec} || Node <- Nodes], + ClusterNodes = emqx_cth_cluster:start( + Cluster, + #{work_dir => ?config(priv_dir, Config)} + ), + ok = snabbkaffe:start_trace(), + [{cluster_nodes, ClusterNodes} | Config]. + +end_per_testcase(_TestCase, Config) -> + Nodes = ?config(cluster_nodes, Config), + ok = snabbkaffe:stop(), + erpc:multicall(Nodes, meck, unload, []), + ok = emqx_cth_cluster:stop(Nodes), + ok. + +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +app_specs() -> + [ + {emqx, #{ + before_start => fun() -> + emqx_app:set_config_loader(?MODULE) + end, + override_env => [{boot_modules, [broker, listeners]}] + }}, + {emqx_retainer, #{ + config => + #{ + retainer => + #{enable => true} + } + }}, + {emqx_modules, #{ + config => + #{delayed => #{enable => true}} + }}, + emqx_eviction_agent, + emqx_node_rebalance + ]. + +opts(_Config) -> + #{ + purge_rate => 10 + }. + +case_specific_data_dir(Case, Config) -> + case ?config(priv_dir, Config) of + undefined -> undefined; + PrivDir -> filename:join(PrivDir, atom_to_list(Case)) + end. + +get_mqtt_port(Node, Type) -> + {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), + Port. + +%% to avoid it finishing too fast +with_some_sessions(Node, Fn) -> + erpc:call(Node, fun() -> + emqx_common_test_helpers:with_mock( + emqx_eviction_agent, + all_channels_count, + fun() -> 100 end, + Fn + ) + end). + +drain_exits([ClientPid | Rest]) -> + receive + {'EXIT', ClientPid, _Reason} -> + drain_exits(Rest) + after 1_000 -> + ct:pal("mailbox:\n ~p", [process_info(self(), messages)]), + ct:fail("pid ~p didn't die", [ClientPid]) + end; +drain_exits([]) -> + ok. + +emqtt_connect_many(Port, Count) -> + emqtt_connect_many(Port, Count, _StartN = 1). + +%% start many clients with mixed clean_start flags +emqtt_connect_many(Port, Count, StartN) -> + lists:map( + fun(N) -> + NBin = integer_to_binary(N), + ClientId = <<"client-", NBin/binary>>, + CleanStart = N rem 2 == 0, + {ok, C} = emqtt_connect([{clientid, ClientId}, {clean_start, CleanStart}, {port, Port}]), + C + end, + lists:seq(StartN, StartN + Count - 1) + ). + +%%-------------------------------------------------------------------- +%% Test Cases : one node +%%-------------------------------------------------------------------- + +t_agent_busy(Config) -> + [Node] = ?config(cluster_nodes, Config), + + ok = rpc:call(Node, emqx_eviction_agent, enable, [other_rebalance, undefined]), + + erpc:call(Node, fun() -> + ?assertExit( + {{{bad_nodes, [{Node, {error, eviction_agent_busy}}]}, _}, _}, + emqx_node_rebalance_purge:start(opts(Config)) + ) + end), + + ok. + +t_already_started(Config) -> + [Node] = ?config(cluster_nodes, Config), + with_some_sessions(Node, fun() -> + ok = emqx_node_rebalance_purge:start(opts(Config)), + + ?assertEqual( + {error, already_started}, + emqx_node_rebalance_purge:start(opts(Config)) + ), + + ?assertEqual( + ok, + emqx_node_rebalance_purge:stop() + ), + + ok + end), + ok. + +t_not_started(Config) -> + [Node] = ?config(cluster_nodes, Config), + + ?assertEqual( + {error, not_started}, + rpc:call(Node, emqx_node_rebalance_purge, stop, []) + ). + +t_start(Config) -> + [Node] = ?config(cluster_nodes, Config), + Port = get_mqtt_port(Node, tcp), + + with_some_sessions(Node, fun() -> + process_flag(trap_exit, true), + ok = snabbkaffe:start_trace(), + + ?assertEqual( + ok, + emqx_node_rebalance_purge:start(opts(Config)) + ), + ?assertEqual({error, {use_another_server, #{}}}, emqtt_try_connect([{port, Port}])), + ok + end), + ok. + +t_non_persistence(Config) -> + [Node] = ?config(cluster_nodes, Config), + Port = get_mqtt_port(Node, tcp), + + %% to avoid it finishing too fast + with_some_sessions(Node, fun() -> + process_flag(trap_exit, true), + ok = snabbkaffe:start_trace(), + + ?assertEqual( + ok, + emqx_node_rebalance_purge:start(opts(Config)) + ), + + ?assertMatch( + {error, {use_another_server, #{}}}, + emqtt_try_connect([{port, Port}]) + ), + + ok = supervisor:terminate_child(emqx_node_rebalance_sup, emqx_node_rebalance_purge), + {ok, _} = supervisor:restart_child(emqx_node_rebalance_sup, emqx_node_rebalance_purge), + + ?assertMatch( + ok, + emqtt_try_connect([{port, Port}]) + ), + ?assertMatch(disabled, emqx_node_rebalance_purge:status()), + ok + end), + ok. + +t_unknown_messages(Config) -> + process_flag(trap_exit, true), + + [Node] = ?config(cluster_nodes, Config), + + ok = rpc:call(Node, emqx_node_rebalance_purge, start, [opts(Config)]), + Pid = rpc:call(Node, erlang, whereis, [emqx_node_rebalance_purge]), + Pid ! unknown, + ok = gen_server:cast(Pid, unknown), + ?assertEqual( + ignored, + gen_server:call(Pid, unknown) + ), + + ok. + +%%-------------------------------------------------------------------- +%% Test Cases : two nodes +%%-------------------------------------------------------------------- + +t_already_started_two(Config) -> + [Node1, _Node2] = ?config(cluster_nodes, Config), + with_some_sessions(Node1, fun() -> + ok = emqx_node_rebalance_purge:start(opts(Config)), + + ?assertEqual( + {error, already_started}, + emqx_node_rebalance_purge:start(opts(Config)) + ), + + ?assertEqual( + ok, + emqx_node_rebalance_purge:stop() + ), + + ok + end), + ?assertEqual( + {error, not_started}, + rpc:call(Node1, emqx_node_rebalance_purge, stop, []) + ), + + ok. + +t_session_purged(Config) -> + process_flag(trap_exit, true), + + [Node1, Node2] = ?config(cluster_nodes, Config), + Port1 = get_mqtt_port(Node1, tcp), + Port2 = get_mqtt_port(Node2, tcp), + + %% N.B.: it's important to have an asymmetric number of clients for this test, as + %% otherwise the scenario might happen to finish successfully due to the wrong + %% reasons! + NumClientsNode1 = 5, + NumClientsNode2 = 35, + Node1Clients = emqtt_connect_many(Port1, NumClientsNode1, _StartN1 = 1), + Node2Clients = emqtt_connect_many(Port2, NumClientsNode2, _StartN2 = 21), + lists:foreach( + fun(C) -> + ClientId = proplists:get_value(clientid, emqtt:info(C)), + Topic = emqx_topic:join([<<"t">>, ClientId]), + Props = #{}, + Payload = ClientId, + Opts = [{retain, true}], + ok = emqtt:publish(C, Topic, Props, Payload, Opts), + DelayedTopic = emqx_topic:join([<<"$delayed/120">>, Topic]), + ok = emqtt:publish(C, DelayedTopic, Payload), + {ok, _, [?RC_GRANTED_QOS_0]} = emqtt:subscribe(C, Topic), + ok + end, + Node1Clients ++ Node2Clients + ), + + ?assertEqual(40, erpc:call(Node2, emqx_retainer, retained_count, [])), + ?assertEqual(NumClientsNode1, erpc:call(Node1, emqx_delayed, delayed_count, [])), + ?assertEqual(NumClientsNode2, erpc:call(Node2, emqx_delayed, delayed_count, [])), + + {ok, SRef0} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := "cluster_purge_done"}), + 15_000 + ), + ok = rpc:call(Node1, emqx_node_rebalance_purge, start, [opts(Config)]), + {ok, _} = snabbkaffe:receive_events(SRef0), + + ?assertEqual([], erpc:call(Node1, emqx_cm, all_channels, [])), + ?assertEqual([], erpc:call(Node2, emqx_cm, all_channels, [])), + ?assertEqual(0, erpc:call(Node1, emqx_retainer, retained_count, [])), + ?assertEqual(0, erpc:call(Node2, emqx_retainer, retained_count, [])), + ?assertEqual(0, erpc:call(Node1, emqx_delayed, delayed_count, [])), + ?assertEqual(0, erpc:call(Node2, emqx_delayed, delayed_count, [])), + + ok = drain_exits(Node1Clients ++ Node2Clients), + + ok. diff --git a/apps/emqx_node_rebalance/test/emqx_node_rebalance_status_SUITE.erl b/apps/emqx_node_rebalance/test/emqx_node_rebalance_status_SUITE.erl index 167c37d8c..f9c50b761 100644 --- a/apps/emqx_node_rebalance/test/emqx_node_rebalance_status_SUITE.erl +++ b/apps/emqx_node_rebalance/test/emqx_node_rebalance_status_SUITE.erl @@ -57,7 +57,7 @@ end_per_suite(Config) -> t_cluster_status(Config) -> [CoreNode, ReplicantNode] = ?config(cluster_nodes, Config), - ok = emqx_node_rebalance_api_proto_v1:node_rebalance_evacuation_start(CoreNode, #{}), + ok = emqx_node_rebalance_api_proto_v2:node_rebalance_evacuation_start(CoreNode, #{}), ?assertMatch( #{evacuations := [_], rebalances := []}, diff --git a/changes/ee/feat-11447.en.md b/changes/ee/feat-11447.en.md new file mode 100644 index 000000000..caa808861 --- /dev/null +++ b/changes/ee/feat-11447.en.md @@ -0,0 +1 @@ +Added CLI command to wipe session and retained message data on the whole cluster. diff --git a/rel/i18n/emqx_node_rebalance_api.hocon b/rel/i18n/emqx_node_rebalance_api.hocon index 8b598134a..00346d52c 100644 --- a/rel/i18n/emqx_node_rebalance_api.hocon +++ b/rel/i18n/emqx_node_rebalance_api.hocon @@ -42,6 +42,18 @@ load_rebalance_evacuation_stop.desc: load_rebalance_evacuation_stop.label: """Stop evacuation""" +cluster_purge_start.desc: +"""Start purge process""" + +cluster_purge_start.label: +"""Start purge""" + +cluster_purge_stop.desc: +"""Stop purge process""" + +cluster_purge_stop.label: +"""Stop purge""" + param_node.desc: """Node name""" @@ -150,6 +162,12 @@ local_status_session_eviction_rate.desc: local_status_session_eviction_rate.label: """Session eviction rate""" +local_status_purge_rate.desc: +"""The rate of purging sessions, in sessions per second""" + +local_status_purge_rate.label: +"""Session purge rate""" + local_status_connection_goal.desc: """The number of connections that the node should have after the rebalance/evacuation process"""