Merge pull request #11447 from thalesmg/cluster-wipe-20230814

feat: cluster purge
This commit is contained in:
Thales Macedo Garitezi 2023-08-28 12:34:35 -03:00 committed by GitHub
commit 1e7cde5712
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1521 additions and 67 deletions

View File

@ -15,7 +15,9 @@
{emqx_conf,3}.
{emqx_dashboard,1}.
{emqx_delayed,1}.
{emqx_delayed,2}.
{emqx_eviction_agent,1}.
{emqx_eviction_agent,2}.
{emqx_exhook,1}.
{emqx_ft_storage_exporter_fs,1}.
{emqx_ft_storage_fs,1}.
@ -37,9 +39,13 @@
{emqx_mgmt_trace,1}.
{emqx_mgmt_trace,2}.
{emqx_node_rebalance,1}.
{emqx_node_rebalance,2}.
{emqx_node_rebalance_api,1}.
{emqx_node_rebalance_api,2}.
{emqx_node_rebalance_evacuation,1}.
{emqx_node_rebalance_purge,1}.
{emqx_node_rebalance_status,1}.
{emqx_node_rebalance_status,2}.
{emqx_persistent_session,1}.
{emqx_persistent_session_ds,1}.
{emqx_plugins,1}.

View File

@ -78,6 +78,7 @@
%% Client management
-export([
all_channels_table/1,
channel_with_session_table/1,
live_connection_table/1
]).
@ -593,6 +594,26 @@ channel_with_session_table(ConnModuleList) ->
sets:is_element(ConnModule, ConnModules)
]).
%% @doc Get clientinfo for all clients, regardless if they use clean start or not.
all_channels_table(ConnModuleList) ->
Ms = ets:fun2ms(
fun({{ClientId, _ChanPid}, Info, _Stats}) ->
{ClientId, Info}
end
),
Table = ets:table(?CHAN_INFO_TAB, [{traverse, {select, Ms}}]),
ConnModules = sets:from_list(ConnModuleList, [{version, 2}]),
qlc:q([
{ClientId, ConnState, ConnInfo, ClientInfo}
|| {ClientId, #{
conn_state := ConnState,
clientinfo := ClientInfo,
conninfo := #{conn_mod := ConnModule} = ConnInfo
}} <-
Table,
sets:is_element(ConnModule, ConnModules)
]).
%% @doc Get all local connection query handle
live_connection_table(ConnModules) ->
Ms = lists:map(fun live_connection_ms/1, ConnModules),

View File

@ -1,6 +1,6 @@
{application, emqx_eviction_agent, [
{description, "EMQX Eviction Agent"},
{vsn, "5.1.0"},
{vsn, "5.1.1"},
{registered, [
emqx_eviction_agent_sup,
emqx_eviction_agent,

View File

@ -18,14 +18,19 @@
disable/1,
status/0,
connection_count/0,
all_channels_count/0,
session_count/0,
session_count/1,
evict_connections/1,
evict_sessions/2,
evict_sessions/3,
purge_sessions/1,
evict_session_channel/3
]).
%% RPC targets
-export([all_local_channels_count/0]).
-behaviour(gen_server).
-export([
@ -113,6 +118,14 @@ evict_sessions(N, Nodes, ConnState) when
{error, disabled}
end.
purge_sessions(N) ->
case enable_status() of
{enabled, _Kind, _ServerReference} ->
ok = do_purge_sessions(N);
disabled ->
{error, disabled}
end.
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
@ -231,6 +244,33 @@ channel_with_session_table(RequiredConnState) ->
RequiredConnState =:= ConnState
]).
-spec all_channels_count() -> non_neg_integer().
all_channels_count() ->
Nodes = emqx:running_nodes(),
Timeout = 15_000,
Results = emqx_eviction_agent_proto_v2:all_channels_count(Nodes, Timeout),
NodeResults = lists:zip(Nodes, Results),
Errors = lists:filter(
fun
({_Node, {ok, _}}) -> false;
({_Node, _Err}) -> true
end,
NodeResults
),
Errors =/= [] andalso
?SLOG(
warning,
#{
msg => "error_collecting_all_channels_count",
errors => maps:from_list(Errors)
}
),
lists:sum([N || {ok, N} <- Results]).
-spec all_local_channels_count() -> non_neg_integer().
all_local_channels_count() ->
table_count(emqx_cm:all_channels_table(?CONN_MODULES)).
session_count() ->
session_count(any).
@ -247,6 +287,17 @@ take_connections(N) ->
ok = qlc:delete_cursor(ChanPidCursor),
ChanPids.
take_channels(N) ->
QH = qlc:q([
{ClientId, ConnInfo, ClientInfo}
|| {ClientId, _, ConnInfo, ClientInfo} <-
emqx_cm:all_channels_table(?CONN_MODULES)
]),
ChanPidCursor = qlc:cursor(QH),
Channels = qlc:next_answers(ChanPidCursor, N),
ok = qlc:delete_cursor(ChanPidCursor),
Channels.
take_channel_with_sessions(N, ConnState) ->
ChanPidCursor = qlc:cursor(channel_with_session_table(ConnState)),
Channels = qlc:next_answers(ChanPidCursor, N),
@ -283,7 +334,7 @@ evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo) ->
client_info => ClientInfo
}
),
case emqx_eviction_agent_proto_v1:evict_session_channel(Node, ClientId, ConnInfo, ClientInfo) of
case emqx_eviction_agent_proto_v2:evict_session_channel(Node, ClientId, ConnInfo, ClientInfo) of
{badrpc, Reason} ->
?SLOG(
error,
@ -344,5 +395,14 @@ disconnect_channel(ChanPid, ServerReference) ->
'Server-Reference' => ServerReference
}}.
do_purge_sessions(N) when N > 0 ->
Channels = take_channels(N),
ok = lists:foreach(
fun({ClientId, _ConnInfo, _ClientInfo}) ->
emqx_cm:discard_session(ClientId)
end,
Channels
).
select_random(List) when length(List) > 0 ->
lists:nth(rand:uniform(length(List)), List).

View File

@ -0,0 +1,35 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_eviction_agent_proto_v2).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
evict_session_channel/4,
%% Introduced in v2:
all_channels_count/2
]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.2.1".
-spec evict_session_channel(
node(),
emqx_types:clientid(),
emqx_types:conninfo(),
emqx_types:clientinfo()
) -> supervisor:startchild_err() | emqx_rpc:badrpc().
evict_session_channel(Node, ClientId, ConnInfo, ClientInfo) ->
rpc:call(Node, emqx_eviction_agent, evict_session_channel, [ClientId, ConnInfo, ClientInfo]).
%% Introduced in v2:
-spec all_channels_count([node()], time:time()) -> emqx_rpc:erpc_multicall(non_neg_integer()).
all_channels_count(Nodes, Timeout) ->
erpc:multicall(Nodes, emqx_eviction_agent, all_local_channels_count, [], Timeout).

View File

@ -9,6 +9,7 @@
emqtt_connect/1,
emqtt_connect/2,
emqtt_connect_many/2,
emqtt_connect_many/3,
stop_many/1,
emqtt_try_connect/1,
@ -42,6 +43,9 @@ emqtt_connect(Opts) ->
end.
emqtt_connect_many(Port, Count) ->
emqtt_connect_many(Port, Count, _StartN = 1).
emqtt_connect_many(Port, Count, StartN) ->
lists:map(
fun(N) ->
NBin = integer_to_binary(N),
@ -49,7 +53,7 @@ emqtt_connect_many(Port, Count) ->
{ok, C} = emqtt_connect([{clientid, ClientId}, {clean_start, false}, {port, Port}]),
C
end,
lists:seq(1, Count)
lists:seq(StartN, StartN + Count - 1)
).
stop_many(Clients) ->

View File

@ -45,18 +45,22 @@
code_change/3
]).
%% gen_server callbacks
%% API
-export([
load/0,
unload/0,
load_or_unload/1,
get_conf/1,
update_config/1,
delayed_count/0,
list/1,
get_delayed_message/1,
get_delayed_message/2,
delete_delayed_message/1,
delete_delayed_message/2,
clear_all/0,
%% rpc target
clear_all_local/0,
cluster_list/1
]).
@ -167,6 +171,9 @@ unload() ->
load_or_unload(Bool) ->
gen_server:call(?SERVER, {do_load_or_unload, Bool}).
-spec delayed_count() -> non_neg_integer().
delayed_count() -> mnesia:table_info(?TAB, size).
list(Params) ->
emqx_mgmt_api:paginate(?TAB, Params, ?FORMAT_FUN).
@ -243,7 +250,7 @@ get_delayed_message(Id) ->
get_delayed_message(Node, Id) when Node =:= node() ->
get_delayed_message(Id);
get_delayed_message(Node, Id) ->
emqx_delayed_proto_v1:get_delayed_message(Node, Id).
emqx_delayed_proto_v2:get_delayed_message(Node, Id).
-spec delete_delayed_message(binary()) -> with_id_return().
delete_delayed_message(Id) ->
@ -258,7 +265,19 @@ delete_delayed_message(Id) ->
delete_delayed_message(Node, Id) when Node =:= node() ->
delete_delayed_message(Id);
delete_delayed_message(Node, Id) ->
emqx_delayed_proto_v1:delete_delayed_message(Node, Id).
emqx_delayed_proto_v2:delete_delayed_message(Node, Id).
-spec clear_all() -> ok.
clear_all() ->
Nodes = emqx:running_nodes(),
_ = emqx_delayed_proto_v2:clear_all(Nodes),
ok.
%% rpc target
-spec clear_all_local() -> ok.
clear_all_local() ->
_ = mria:clear_table(?TAB),
ok.
update_config(Config) ->
emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}).
@ -408,9 +427,6 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now ->
end,
do_publish(mnesia:dirty_next(?TAB, Key), Now, [Key | Acc]).
-spec delayed_count() -> non_neg_integer().
delayed_count() -> mnesia:table_info(?TAB, size).
do_load_or_unload(true, State) ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_DELAY_PUB),
State;

View File

@ -0,0 +1,48 @@
%%--------------------------------------------------------------------
%%Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_delayed_proto_v2).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
get_delayed_message/2,
delete_delayed_message/2,
%% Introduced in v2:
clear_all/1
]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.2.1".
-spec get_delayed_message(node(), binary()) ->
emqx_delayed:with_id_return(map()) | emqx_rpc:badrpc().
get_delayed_message(Node, Id) ->
rpc:call(Node, emqx_delayed, get_delayed_message, [Id]).
-spec delete_delayed_message(node(), binary()) -> emqx_delayed:with_id_return() | emqx_rpc:badrpc().
delete_delayed_message(Node, Id) ->
rpc:call(Node, emqx_delayed, delete_delayed_message, [Id]).
%% Introduced in v2:
-spec clear_all([node()]) -> emqx_rpc:erpc_multicall(ok).
clear_all(Nodes) ->
erpc:multicall(Nodes, emqx_delayed, clear_all_local, []).

View File

@ -164,15 +164,15 @@ t_cluster(_) ->
?assertMatch(
{ok, _},
emqx_delayed_proto_v1:get_delayed_message(node(), Id)
emqx_delayed_proto_v2:get_delayed_message(node(), Id)
),
%% The 'local' and the 'fake-remote' values should be the same,
%% however there is a race condition, so we are just assert that they are both 'ok' tuples
?assertMatch({ok, _}, emqx_delayed:get_delayed_message(Id)),
?assertMatch({ok, _}, emqx_delayed_proto_v1:get_delayed_message(node(), Id)),
?assertMatch({ok, _}, emqx_delayed_proto_v2:get_delayed_message(node(), Id)),
ok = emqx_delayed_proto_v1:delete_delayed_message(node(), Id),
ok = emqx_delayed_proto_v2:delete_delayed_message(node(), Id),
?assertMatch(
{error, _},

View File

@ -1,6 +1,6 @@
{application, emqx_node_rebalance, [
{description, "EMQX Node Rebalance"},
{vsn, "5.0.4"},
{vsn, "5.0.5"},
{registered, [
emqx_node_rebalance_sup,
emqx_node_rebalance,

View File

@ -81,7 +81,7 @@ start_link() ->
-spec available_nodes(list(node())) -> list(node()).
available_nodes(Nodes) when is_list(Nodes) ->
{Available, _} = emqx_node_rebalance_proto_v1:available_nodes(Nodes),
{Available, _} = emqx_node_rebalance_proto_v2:available_nodes(Nodes),
lists:filter(fun is_atom/1, Available).
%%--------------------------------------------------------------------
@ -370,7 +370,7 @@ avg(List) when length(List) >= 1 ->
lists:sum(List) / length(List).
multicall(Nodes, F, A) ->
case apply(emqx_node_rebalance_proto_v1, F, [Nodes | A]) of
case apply(emqx_node_rebalance_proto_v2, F, [Nodes | A]) of
{Results, []} ->
case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of
{OkResults, []} ->

View File

@ -14,7 +14,9 @@
-export([
start_link/0,
enable/1,
enable/2,
disable/1,
disable/2,
status/0
]).
@ -40,11 +42,21 @@ start_link() ->
-spec enable(pid()) -> ok_or_error(already_enabled | eviction_agent_busy).
enable(CoordinatorPid) ->
gen_server:call(?MODULE, {enable, CoordinatorPid}).
enable(CoordinatorPid, ?ENABLE_KIND).
-spec enable(pid(), emqx_eviction_agent:kind()) ->
ok_or_error(already_enabled | eviction_agent_busy).
enable(CoordinatorPid, Kind) ->
gen_server:call(?MODULE, {enable, CoordinatorPid, Kind}).
-spec disable(pid()) -> ok_or_error(already_disabled | invalid_coordinator).
disable(CoordinatorPid) ->
gen_server:call(?MODULE, {disable, CoordinatorPid}).
disable(CoordinatorPid, ?ENABLE_KIND).
-spec disable(pid(), emqx_eviction_agent:kind()) ->
ok_or_error(already_disabled | invalid_coordinator).
disable(CoordinatorPid, Kind) ->
gen_server:call(?MODULE, {disable, CoordinatorPid, Kind}).
-spec status() -> status().
status() ->
@ -57,7 +69,7 @@ status() ->
init([]) ->
{ok, #{}}.
handle_call({enable, CoordinatorPid}, _From, St) ->
handle_call({enable, CoordinatorPid, Kind}, _From, St) ->
case St of
#{coordinator_pid := _Pid} ->
{reply, {error, already_enabled}, St};
@ -65,7 +77,7 @@ handle_call({enable, CoordinatorPid}, _From, St) ->
true = link(CoordinatorPid),
EvictionAgentPid = whereis(emqx_eviction_agent),
true = link(EvictionAgentPid),
case emqx_eviction_agent:enable(?ENABLE_KIND, undefined) of
case emqx_eviction_agent:enable(Kind, undefined) of
ok ->
{reply, ok, #{
coordinator_pid => CoordinatorPid,
@ -77,13 +89,13 @@ handle_call({enable, CoordinatorPid}, _From, St) ->
{reply, {error, eviction_agent_busy}, St}
end
end;
handle_call({disable, CoordinatorPid}, _From, St) ->
handle_call({disable, CoordinatorPid, Kind}, _From, St) ->
case St of
#{
coordinator_pid := CoordinatorPid,
eviction_agent_pid := EvictionAgentPid
} ->
_ = emqx_eviction_agent:disable(?ENABLE_KIND),
_ = emqx_eviction_agent:disable(Kind),
true = unlink(EvictionAgentPid),
true = unlink(CoordinatorPid),
NewSt = maps:without(

View File

@ -31,7 +31,9 @@
'/load_rebalance/:node/start'/2,
'/load_rebalance/:node/stop'/2,
'/load_rebalance/:node/evacuation/start'/2,
'/load_rebalance/:node/evacuation/stop'/2
'/load_rebalance/:node/evacuation/stop'/2,
'/load_rebalance/:node/purge/start'/2,
'/load_rebalance/:node/purge/stop'/2
]).
%% Schema examples
@ -67,6 +69,9 @@ paths() ->
"/load_rebalance/:node/stop",
"/load_rebalance/:node/evacuation/start",
"/load_rebalance/:node/evacuation/stop"
%% TODO: uncomment after we officially release the feature.
%% "/load_rebalance/:node/purge/start",
%% "/load_rebalance/:node/purge/stop"
].
schema("/load_rebalance/status") ->
@ -176,6 +181,42 @@ schema("/load_rebalance/:node/evacuation/stop") ->
}
}
}.
%% TODO: uncomment after we officially release the feature.
%% schema("/load_rebalance/:node/purge/start") ->
%% #{
%% 'operationId' => '/load_rebalance/:node/purge/start',
%% post => #{
%% tags => [<<"load_rebalance">>],
%% summary => <<"Start purge on the whole cluster">>,
%% description => ?DESC("cluster_purge_start"),
%% parameters => [param_node()],
%% 'requestBody' =>
%% emqx_dashboard_swagger:schema_with_examples(
%% ref(purge_start),
%% purge_example()
%% ),
%% responses => #{
%% 200 => response_schema(),
%% 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
%% 404 => error_codes([?NOT_FOUND], <<"Not Found">>)
%% }
%% }
%% };
%% schema("/load_rebalance/:node/purge/stop") ->
%% #{
%% 'operationId' => '/load_rebalance/:node/purge/stop',
%% post => #{
%% tags => [<<"load_rebalance">>],
%% summary => <<"Stop purge on the whole cluster">>,
%% description => ?DESC("cluster_purge_stop"),
%% parameters => [param_node()],
%% responses => #{
%% 200 => response_schema(),
%% 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
%% 404 => error_codes([?NOT_FOUND], <<"Not Found">>)
%% }
%% }
%% }.
%%--------------------------------------------------------------------
%% Handlers
@ -188,16 +229,20 @@ schema("/load_rebalance/:node/evacuation/stop") ->
{rebalance, Stats} ->
{200, format_status(rebalance, Stats)};
{evacuation, Stats} ->
{200, format_status(evacuation, Stats)}
{200, format_status(evacuation, Stats)};
{purge, Stats} ->
{200, format_status(purge, Stats)}
end.
'/load_rebalance/global_status'(get, #{}) ->
#{
evacuations := Evacuations,
purges := Purges,
rebalances := Rebalances
} = emqx_node_rebalance_status:global_status(),
{200, #{
evacuations => format_as_map_list(Evacuations),
purges => format_as_map_list(Purges),
rebalances => format_as_map_list(Rebalances)
}}.
@ -214,7 +259,7 @@ schema("/load_rebalance/:node/evacuation/stop") ->
Params1 = translate(rebalance_start, Params0),
with_nodes_at_key(nodes, Params1, fun(Params2) ->
wrap_rpc(
Node, emqx_node_rebalance_api_proto_v1:node_rebalance_start(Node, Params2)
Node, emqx_node_rebalance_api_proto_v2:node_rebalance_start(Node, Params2)
)
end)
end).
@ -222,7 +267,7 @@ schema("/load_rebalance/:node/evacuation/stop") ->
'/load_rebalance/:node/stop'(post, #{bindings := #{node := NodeBin}}) ->
emqx_utils_api:with_node(NodeBin, fun(Node) ->
wrap_rpc(
Node, emqx_node_rebalance_api_proto_v1:node_rebalance_stop(Node)
Node, emqx_node_rebalance_api_proto_v2:node_rebalance_stop(Node)
)
end).
@ -234,7 +279,7 @@ schema("/load_rebalance/:node/evacuation/stop") ->
with_nodes_at_key(migrate_to, Params1, fun(Params2) ->
wrap_rpc(
Node,
emqx_node_rebalance_api_proto_v1:node_rebalance_evacuation_start(
emqx_node_rebalance_api_proto_v2:node_rebalance_evacuation_start(
Node, Params2
)
)
@ -244,7 +289,27 @@ schema("/load_rebalance/:node/evacuation/stop") ->
'/load_rebalance/:node/evacuation/stop'(post, #{bindings := #{node := NodeBin}}) ->
emqx_utils_api:with_node(NodeBin, fun(Node) ->
wrap_rpc(
Node, emqx_node_rebalance_api_proto_v1:node_rebalance_evacuation_stop(Node)
Node, emqx_node_rebalance_api_proto_v2:node_rebalance_evacuation_stop(Node)
)
end).
'/load_rebalance/:node/purge/start'(post, #{
bindings := #{node := NodeBin}, body := Params0
}) ->
emqx_utils_api:with_node(NodeBin, fun(Node) ->
Params1 = translate(purge_start, Params0),
wrap_rpc(
Node,
emqx_node_rebalance_api_proto_v2:node_rebalance_purge_start(
Node, Params1
)
)
end).
'/load_rebalance/:node/purge/stop'(post, #{bindings := #{node := NodeBin}}) ->
emqx_utils_api:with_node(NodeBin, fun(Node) ->
wrap_rpc(
Node, emqx_node_rebalance_api_proto_v2:node_rebalance_purge_stop(Node)
)
end).
@ -483,6 +548,17 @@ fields(rebalance_evacuation_start) ->
}
)}
];
fields(purge_start) ->
[
{"purge_rate",
mk(
pos_integer(),
#{
desc => ?DESC(purge_rate),
required => false
}
)}
];
fields(local_status_disabled) ->
[
{"status",
@ -687,6 +763,38 @@ fields(global_evacuation_status) ->
}
)}
];
fields(global_purge_status) ->
without(
[
"status",
"process",
"connection_eviction_rate",
"session_eviction_rate",
"connection_goal",
"disconnected_session_goal",
"session_recipients",
"recipients"
],
fields(local_status_enabled)
) ++
[
{"purge_rate",
mk(
pos_integer(),
#{
desc => ?DESC(local_status_purge_rate),
required => false
}
)},
{"node",
mk(
binary(),
#{
desc => ?DESC(evacuation_status_node),
required => true
}
)}
];
fields(global_status) ->
[
{"evacuations",
@ -697,6 +805,14 @@ fields(global_status) ->
required => false
}
)},
{"purges",
mk(
hoconsc:array(ref(global_purge_status)),
#{
desc => ?DESC(global_status_purges),
required => false
}
)},
{"rebalances",
mk(
hoconsc:array(ref(global_coordinator_status)),
@ -735,6 +851,10 @@ rebalance_evacuation_example() ->
}
}.
%% TODO: uncomment after we officially release the feature.
%% purge_example() ->
%% #{purge => #{purge_rate => 100}}.
local_status_response_schema() ->
hoconsc:union([ref(local_status_disabled), ref(local_status_enabled)]).

View File

@ -29,6 +29,15 @@ cli(["start" | StartArgs]) ->
emqx_ctl:print("Rebalance is already enabled~n"),
false
end;
{purge, Opts} ->
case emqx_node_rebalance_purge:start(Opts) of
ok ->
emqx_ctl:print("Rebalance(purge) started~n"),
true;
{error, Reason} ->
emqx_ctl:print("Rebalance(purge) start error: ~p~n", [Reason]),
false
end;
{rebalance, Opts} ->
case emqx_node_rebalance:start(Opts) of
ok ->
@ -55,6 +64,7 @@ cli(["node-status"]) ->
cli(["status"]) ->
#{
evacuations := Evacuations,
purges := Purges,
rebalances := Rebalances
} = emqx_node_rebalance_status:global_status(),
lists:foreach(
@ -69,6 +79,18 @@ cli(["status"]) ->
end,
Evacuations
),
lists:foreach(
fun({Node, Status}) ->
emqx_ctl:print(
"--------------------------------------------------------------------~n"
),
emqx_ctl:print(
"Node ~p: purge~n~s",
[Node, emqx_node_rebalance_status:format_local_status(Status)]
)
end,
Purges
),
lists:foreach(
fun({Node, Status}) ->
emqx_ctl:print(
@ -82,10 +104,14 @@ cli(["status"]) ->
Rebalances
);
cli(["stop"]) ->
case emqx_node_rebalance_evacuation:status() of
{enabled, _} ->
ok = emqx_node_rebalance_evacuation:stop(),
emqx_ctl:print("Rebalance(evacuation) stopped~n"),
Checks =
[
{evacuation, fun emqx_node_rebalance_evacuation:status/0,
fun emqx_node_rebalance_evacuation:stop/0},
{purge, fun emqx_node_rebalance_purge:status/0, fun emqx_node_rebalance_purge:stop/0}
],
case do_stop(Checks) of
ok ->
true;
disabled ->
case emqx_node_rebalance:status() of
@ -112,6 +138,13 @@ cli(_) ->
"Start current node evacuation with optional server redirect to the specified servers"
},
%% TODO: uncomment after we officially release the feature.
%% {
%% "rebalance start --purge \\\n"
%% " [--purge-rate CountPerSec]",
%% "Start purge on all running nodes in the cluster"
%% },
{
"rebalance start \\\n"
" [--nodes \"node1@host1 node2@host2\"] \\\n"
@ -140,7 +173,11 @@ cli(_) ->
node_status(NodeStatus) ->
case NodeStatus of
{Process, Status} when Process =:= evacuation orelse Process =:= rebalance ->
{Process, Status} when
Process =:= evacuation;
Process =:= purge;
Process =:= rebalance
->
emqx_ctl:print(
"Rebalance type: ~p~n~s~n",
[Process, emqx_node_rebalance_status:format_local_status(Status)]
@ -160,6 +197,13 @@ start_args(Args) ->
{error, _} = Error ->
Error
end;
{ok, #{"--purge" := true} = Collected} ->
case validate_purge(maps:to_list(Collected), #{}) of
{ok, Validated} ->
{purge, Validated};
{error, _} = Error ->
Error
end;
{ok, #{} = Collected} ->
case validate_rebalance(maps:to_list(Collected), #{}) of
{ok, Validated} ->
@ -180,6 +224,11 @@ collect_args(["--redirect-to", ServerReference | Args], Map) ->
collect_args(Args, Map#{"--redirect-to" => ServerReference});
collect_args(["--migrate-to", MigrateTo | Args], Map) ->
collect_args(Args, Map#{"--migrate-to" => MigrateTo});
%% purge
collect_args(["--purge" | Args], Map) ->
collect_args(Args, Map#{"--purge" => true});
collect_args(["--purge-rate", PurgeRate | Args], Map) ->
collect_args(Args, Map#{"--purge-rate" => PurgeRate});
%% rebalance
collect_args(["--nodes", Nodes | Args], Map) ->
collect_args(Args, Map#{"--nodes" => Nodes});
@ -239,6 +288,15 @@ validate_evacuation([{"--migrate-to", MigrateTo} | Rest], Map) ->
validate_evacuation(Rest, _Map) ->
{error, io_lib:format("unknown evacuation arguments: ~p", [Rest])}.
validate_purge([], Map) ->
{ok, Map};
validate_purge([{"--purge", _} | Rest], Map) ->
validate_purge(Rest, Map);
validate_purge([{"--purge-rate", _} | _] = Opts, Map) ->
validate_pos_int(purge_rate, Opts, Map, fun validate_purge/2);
validate_purge(Rest, _Map) ->
{error, io_lib:format("unknown purge arguments: ~p", [Rest])}.
validate_rebalance([], Map) ->
{ok, Map};
validate_rebalance([{"--wait-health-check", _} | _] = Opts, Map) ->
@ -306,3 +364,15 @@ strings_to_atoms([Str | Rest], Atoms, Invalid) ->
{error, _} ->
strings_to_atoms(Rest, Atoms, [Str | Invalid])
end.
do_stop([{Type, Check, Stop} | Rest]) ->
case Check() of
{enabled, _} ->
ok = Stop(),
emqx_ctl:print("Rebalance(~s) stopped~n", [Type]),
ok;
disabled ->
do_stop(Rest)
end;
do_stop([]) ->
disabled.

View File

@ -0,0 +1,233 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_node_rebalance_purge).
-include("emqx_node_rebalance.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([
start/1,
status/0,
stop/0
]).
-export([start_link/0]).
-behaviour(gen_statem).
-export([
init/1,
callback_mode/0,
handle_event/4,
code_change/4
]).
-export_type([
start_opts/0,
start_error/0,
stop_error/0
]).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
-define(DEFAULT_PURGE_RATE, 500).
-define(ENABLE_KIND, purge).
%% gen_statem states
-define(disabled, disabled).
-define(purging, purging).
-define(cleaning_data, cleaning_data).
-type start_opts() :: #{
purge_rate => pos_integer()
}.
-type start_error() :: already_started.
-type stop_error() :: not_started.
-type stats() :: #{
initial_sessions := non_neg_integer(),
current_sessions := non_neg_integer(),
purge_rate := pos_integer()
}.
-type status() :: {enabled, stats()} | disabled.
-spec start(start_opts()) -> ok_or_error(start_error()).
start(StartOpts) ->
Opts = maps:merge(default_opts(), StartOpts),
gen_statem:call(?MODULE, {start, Opts}).
-spec stop() -> ok_or_error(not_started).
stop() ->
gen_statem:call(?MODULE, stop).
-spec status() -> status().
status() ->
gen_statem:call(?MODULE, status).
-spec start_link() -> startlink_ret().
start_link() ->
gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
%%--------------------------------------------------------------------
%% gen_statem callbacks
%%--------------------------------------------------------------------
callback_mode() -> handle_event_function.
%% states: disabled, purging, cleaning_data
init([]) ->
{ok, disabled, #{}}.
%% start
handle_event({call, From}, {start, Opts}, ?disabled, #{} = Data) ->
ok = enable_purge(),
?SLOG(warning, #{
msg => "cluster_purge_started",
opts => Opts
}),
NewData = init_data(Data, Opts),
{next_state, ?purging, NewData, [
{state_timeout, 0, purge},
{reply, From, ok}
]};
handle_event({call, From}, {start, _Opts}, _State, #{}) ->
{keep_state_and_data, [{reply, From, {error, already_started}}]};
%% stop
handle_event({call, From}, stop, ?disabled, #{}) ->
{keep_state_and_data, [{reply, From, {error, not_started}}]};
handle_event({call, From}, stop, _State, Data) ->
ok = disable_purge(),
?SLOG(warning, #{msg => "cluster_purge_stopped"}),
{next_state, disabled, deinit(Data), [{reply, From, ok}]};
%% status
handle_event({call, From}, status, ?disabled, #{}) ->
{keep_state_and_data, [{reply, From, disabled}]};
handle_event({call, From}, status, State, Data) ->
Stats = maps:with(
[
initial_sessions,
current_sessions,
purge_rate
],
Data
),
{keep_state_and_data, [
{reply, From, {enabled, Stats#{state => State}}}
]};
%% session purge
handle_event(
state_timeout,
purge,
?purging,
#{
purge_rate := PurgeRate
} = Data
) ->
case emqx_eviction_agent:all_channels_count() of
Sessions when Sessions > 0 ->
ok = purge_sessions(PurgeRate),
?tp(
warning,
"cluster_purge_evict_sessions",
#{
count => Sessions,
purge_rate => PurgeRate
}
),
NewData = Data#{current_sessions => Sessions},
{keep_state, NewData, [{state_timeout, ?EVICT_INTERVAL, purge}]};
_Sessions = 0 ->
NewData = Data#{current_conns => 0},
?SLOG(warning, #{msg => "cluster_purge_evict_sessions_done"}),
{next_state, ?cleaning_data, NewData, [
{state_timeout, 0, clean_retained_messages}
]}
end;
handle_event(
state_timeout,
clean_retained_messages,
?cleaning_data,
Data
) ->
?SLOG(warning, #{msg => "cluster_purge_cleaning_data"}),
ok = emqx_retainer:clean(),
ok = emqx_delayed:clear_all(),
?tp(warning, "cluster_purge_done", #{}),
ok = disable_purge(),
?tp(warning, "cluster_purge_finished_successfully", #{}),
{next_state, ?disabled, deinit(Data)};
handle_event({call, From}, Msg, State, Data) ->
?SLOG(warning, #{msg => "unknown_call", call => Msg, state => State, data => Data}),
{keep_state_and_data, [{reply, From, ignored}]};
handle_event(info, Msg, State, Data) ->
?SLOG(warning, #{msg => "unknown_info", info => Msg, state => State, data => Data}),
keep_state_and_data;
handle_event(cast, Msg, State, Data) ->
?SLOG(warning, #{msg => "unknown_cast", cast => Msg, state => State, data => Data}),
keep_state_and_data.
code_change(_Vsn, State, Data, _Extra) ->
{ok, State, Data}.
%%--------------------------------------------------------------------
%% internal funs
%%--------------------------------------------------------------------
default_opts() ->
#{
purge_rate => ?DEFAULT_PURGE_RATE
}.
init_data(Data0, Opts) ->
Data1 = maps:merge(Data0, Opts),
SessCount = emqx_eviction_agent:session_count(),
Data1#{
initial_sessions => SessCount,
current_sessions => SessCount
}.
deinit(Data) ->
Keys =
[initial_sessions, current_sessions | maps:keys(default_opts())],
maps:without(Keys, Data).
multicall(Nodes, F, A) ->
case apply(emqx_node_rebalance_proto_v2, F, [Nodes | A]) of
{Results, []} ->
case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of
{_OkResults, []} ->
ok;
{_, BadResults} ->
%% we crash on errors so that the coordinator death is signalled to
%% the eviction agents in the cluster.
error({bad_nodes, BadResults})
end;
{_, [_BadNode | _] = BadNodes} ->
error({bad_nodes, BadNodes})
end.
is_ok({_Node, {ok, _}}) -> true;
is_ok({_Node, ok}) -> true;
is_ok(_) -> false.
enable_purge() ->
Nodes = emqx:running_nodes(),
_ = multicall(Nodes, enable_rebalance_agent, [self(), ?ENABLE_KIND]),
ok.
disable_purge() ->
Nodes = emqx:running_nodes(),
_ = multicall(Nodes, disable_rebalance_agent, [self(), ?ENABLE_KIND]),
ok.
purge_sessions(PurgeRate) ->
Nodes = emqx:running_nodes(),
_ = multicall(Nodes, purge_sessions, [PurgeRate]),
ok.

View File

@ -15,6 +15,7 @@
%% For RPC
-export([
evacuation_status/0,
purge_status/0,
rebalance_status/0
]).
@ -22,11 +23,13 @@
%% APIs
%%--------------------------------------------------------------------
-spec local_status() -> disabled | {evacuation, map()} | {rebalance, map()}.
-spec local_status() -> disabled | {evacuation, map()} | {purge, map()} | {rebalance, map()}.
local_status() ->
case emqx_node_rebalance_evacuation:status() of
{enabled, Status} ->
{evacuation, evacuation(Status)};
Checks = [
{evacuation, fun emqx_node_rebalance_evacuation:status/0, fun evacuation/1},
{purge, fun emqx_node_rebalance_purge:status/0, fun purge/1}
],
case do_local_status(Checks) of
disabled ->
case emqx_node_rebalance_agent:status() of
{enabled, CoordinatorPid} ->
@ -38,28 +41,37 @@ local_status() ->
end;
disabled ->
disabled
end
end;
Res ->
Res
end.
-spec local_status(node()) -> disabled | {evacuation, map()} | {rebalance, map()}.
-spec local_status(node()) -> disabled | {evacuation, map()} | {purge, map()} | {rebalance, map()}.
local_status(Node) ->
emqx_node_rebalance_status_proto_v1:local_status(Node).
emqx_node_rebalance_status_proto_v2:local_status(Node).
-spec format_local_status(map()) -> iodata().
format_local_status(Status) ->
format_status(Status, local_status_field_format_order()).
-spec global_status() -> #{rebalances := [{node(), map()}], evacuations := [{node(), map()}]}.
-spec global_status() ->
#{
rebalances := [{node(), map()}],
evacuations := [{node(), map()}],
purges := [{node(), map()}]
}.
global_status() ->
Nodes = emqx:running_nodes(),
{RebalanceResults, _} = emqx_node_rebalance_status_proto_v1:rebalance_status(Nodes),
{RebalanceResults, _} = emqx_node_rebalance_status_proto_v2:rebalance_status(Nodes),
Rebalances = [
{Node, coordinator_rebalance(Status)}
|| {Node, {enabled, Status}} <- RebalanceResults
],
{EvacuatioResults, _} = emqx_node_rebalance_status_proto_v1:evacuation_status(Nodes),
Evacuations = [{Node, evacuation(Status)} || {Node, {enabled, Status}} <- EvacuatioResults],
#{rebalances => Rebalances, evacuations => Evacuations}.
{EvacuationResults, _} = emqx_node_rebalance_status_proto_v2:evacuation_status(Nodes),
Evacuations = [{Node, evacuation(Status)} || {Node, {enabled, Status}} <- EvacuationResults],
{PurgeResults, _} = emqx_node_rebalance_status_proto_v2:purge_status(Nodes),
Purges = [{Node, purge(Status)} || {Node, {enabled, Status}} <- PurgeResults],
#{rebalances => Rebalances, evacuations => Evacuations, purges => Purges}.
-spec format_coordinator_status(map()) -> iodata().
format_coordinator_status(Status) ->
@ -85,6 +97,17 @@ evacuation(Status) ->
}
}.
purge(Status) ->
#{
state => maps:get(state, Status),
purge_rate => maps:get(purge_rate, Status),
session_goal => 0,
stats => #{
initial_sessions => maps:get(initial_sessions, Status),
current_sessions => maps:get(current_sessions, Status)
}
}.
local_rebalance(#{donors := Donors} = Stats, Node) ->
case lists:member(Node, Donors) of
true -> {rebalance, donor_rebalance(Stats, Node)};
@ -159,6 +182,7 @@ local_status_field_format_order() ->
coordinator_node,
connection_eviction_rate,
session_eviction_rate,
purge_rate,
connection_goal,
session_goal,
disconnected_session_goal,
@ -201,6 +225,8 @@ format_local_status_field({connection_eviction_rate, ConnEvictRate}) ->
io_lib:format("Connection eviction rate: ~p connections/second~n", [ConnEvictRate]);
format_local_status_field({session_eviction_rate, SessEvictRate}) ->
io_lib:format("Session eviction rate: ~p sessions/second~n", [SessEvictRate]);
format_local_status_field({purge_rate, PurgeRate}) ->
io_lib:format("Purge rate: ~p sessions/second~n", [PurgeRate]);
format_local_status_field({connection_goal, ConnGoal}) ->
io_lib:format("Connection goal: ~p~n", [ConnGoal]);
format_local_status_field({session_goal, SessGoal}) ->
@ -231,8 +257,21 @@ format_local_stats(Stats) ->
)
].
do_local_status([{Type, Get, Cont} | Rest]) ->
case Get() of
disabled ->
do_local_status(Rest);
{enabled, Status} ->
{Type, Cont(Status)}
end;
do_local_status([]) ->
disabled.
evacuation_status() ->
{node(), emqx_node_rebalance_evacuation:status()}.
purge_status() ->
{node(), emqx_node_rebalance_purge:status()}.
rebalance_status() ->
{node(), emqx_node_rebalance:status()}.

View File

@ -15,6 +15,7 @@ start_link() ->
init([]) ->
Childs = [
child_spec(emqx_node_rebalance_purge, []),
child_spec(emqx_node_rebalance_evacuation, []),
child_spec(emqx_node_rebalance_agent, []),
child_spec(emqx_node_rebalance, [])

View File

@ -0,0 +1,59 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_node_rebalance_api_proto_v2).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
node_rebalance_evacuation_start/2,
node_rebalance_evacuation_stop/1,
node_rebalance_start/2,
node_rebalance_stop/1,
%% Introduced in v2:
node_rebalance_purge_start/2,
node_rebalance_purge_stop/1
]).
-include_lib("emqx/include/bpapi.hrl").
-include_lib("emqx/include/types.hrl").
introduced_in() ->
"5.2.1".
-spec node_rebalance_evacuation_start(node(), emqx_node_rebalance_evacuation:start_opts()) ->
emqx_rpc:badrpc() | ok_or_error(emqx_node_rebalance_evacuation:start_error()).
node_rebalance_evacuation_start(Node, #{} = Opts) ->
rpc:call(Node, emqx_node_rebalance_evacuation, start, [Opts]).
-spec node_rebalance_evacuation_stop(node()) ->
emqx_rpc:badrpc() | ok_or_error(not_started).
node_rebalance_evacuation_stop(Node) ->
rpc:call(Node, emqx_node_rebalance_evacuation, stop, []).
-spec node_rebalance_start(node(), emqx_node_rebalance:start_opts()) ->
emqx_rpc:badrpc() | ok_or_error(emqx_node_rebalance:start_error()).
node_rebalance_start(Node, Opts) ->
rpc:call(Node, emqx_node_rebalance, start, [Opts]).
-spec node_rebalance_stop(node()) ->
emqx_rpc:badrpc() | ok_or_error(not_started).
node_rebalance_stop(Node) ->
rpc:call(Node, emqx_node_rebalance, stop, []).
%% Introduced in v2:
-spec node_rebalance_purge_start(node(), emqx_node_rebalance_purge:start_opts()) ->
emqx_rpc:badrpc() | ok_or_error(emqx_node_rebalance_purge:start_error()).
node_rebalance_purge_start(Node, #{} = Opts) ->
rpc:call(Node, emqx_node_rebalance_purge, start, [Opts]).
-spec node_rebalance_purge_stop(node()) ->
emqx_rpc:badrpc() | ok_or_error(emqx_node_rebalance_purge:stop_error()).
node_rebalance_purge_stop(Node) ->
rpc:call(Node, emqx_node_rebalance_purge, stop, []).

View File

@ -0,0 +1,84 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_node_rebalance_proto_v2).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
available_nodes/1,
evict_connections/2,
evict_sessions/4,
connection_counts/1,
session_counts/1,
enable_rebalance_agent/2,
disable_rebalance_agent/2,
disconnected_session_counts/1,
%% Introduced in v2:
enable_rebalance_agent/3,
disable_rebalance_agent/3,
purge_sessions/2
]).
-include_lib("emqx/include/bpapi.hrl").
-include_lib("emqx/include/types.hrl").
introduced_in() ->
"5.2.1".
-spec available_nodes([node()]) -> emqx_rpc:multicall_result(node()).
available_nodes(Nodes) ->
rpc:multicall(Nodes, emqx_node_rebalance, is_node_available, []).
-spec evict_connections([node()], non_neg_integer()) ->
emqx_rpc:multicall_result(ok_or_error(disabled)).
evict_connections(Nodes, Count) ->
rpc:multicall(Nodes, emqx_eviction_agent, evict_connections, [Count]).
-spec evict_sessions([node()], non_neg_integer(), [node()], emqx_channel:conn_state()) ->
emqx_rpc:multicall_result(ok_or_error(disabled)).
evict_sessions(Nodes, Count, RecipientNodes, ConnState) ->
rpc:multicall(Nodes, emqx_eviction_agent, evict_sessions, [Count, RecipientNodes, ConnState]).
-spec connection_counts([node()]) -> emqx_rpc:multicall_result({ok, non_neg_integer()}).
connection_counts(Nodes) ->
rpc:multicall(Nodes, emqx_node_rebalance, connection_count, []).
-spec session_counts([node()]) -> emqx_rpc:multicall_result({ok, non_neg_integer()}).
session_counts(Nodes) ->
rpc:multicall(Nodes, emqx_node_rebalance, session_count, []).
-spec enable_rebalance_agent([node()], pid()) ->
emqx_rpc:multicall_result(ok_or_error(already_enabled | eviction_agent_busy)).
enable_rebalance_agent(Nodes, OwnerPid) ->
rpc:multicall(Nodes, emqx_node_rebalance_agent, enable, [OwnerPid]).
-spec disable_rebalance_agent([node()], pid()) ->
emqx_rpc:multicall_result(ok_or_error(already_disabled | invalid_coordinator)).
disable_rebalance_agent(Nodes, OwnerPid) ->
rpc:multicall(Nodes, emqx_node_rebalance_agent, disable, [OwnerPid]).
-spec disconnected_session_counts([node()]) -> emqx_rpc:multicall_result({ok, non_neg_integer()}).
disconnected_session_counts(Nodes) ->
rpc:multicall(Nodes, emqx_node_rebalance, disconnected_session_count, []).
%% Introduced in v2:
-spec enable_rebalance_agent([node()], pid(), emqx_eviction_agent:kind()) ->
emqx_rpc:multicall_result(ok_or_error(already_enabled | eviction_agent_busy)).
enable_rebalance_agent(Nodes, OwnerPid, Kind) ->
rpc:multicall(Nodes, emqx_node_rebalance_agent, enable, [OwnerPid, Kind]).
-spec disable_rebalance_agent([node()], pid(), emqx_eviction_agent:kind()) ->
emqx_rpc:multicall_result(ok_or_error(already_disabled | invalid_coordinator)).
disable_rebalance_agent(Nodes, OwnerPid, Kind) ->
rpc:multicall(Nodes, emqx_node_rebalance_agent, disable, [OwnerPid, Kind]).
-spec purge_sessions([node()], non_neg_integer()) ->
emqx_rpc:multicall_result(ok_or_error(disabled)).
purge_sessions(Nodes, Count) ->
rpc:multicall(Nodes, emqx_eviction_agent, purge_sessions, [Count]).

View File

@ -0,0 +1,29 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_node_rebalance_purge_proto_v1).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
start/2,
stop/1
]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.2.1".
-spec start([node()], emqx_node_rebalance_purge:start_opts()) ->
emqx_rpc:erpc_multicall(ok | {error, emqx_node_rebalance_purge:start_error()}).
start(Nodes, Opts) ->
erpc:multicall(Nodes, emqx_node_rebalance_purge, start, [Opts]).
-spec stop([node()]) ->
emqx_rpc:erpc_multicall(ok | {error, emqx_node_rebalance_purge:stop_error()}).
stop(Nodes) ->
erpc:multicall(Nodes, emqx_node_rebalance_purge, stop, []).

View File

@ -0,0 +1,46 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_node_rebalance_status_proto_v2).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
local_status/1,
rebalance_status/1,
evacuation_status/1,
%% Introduced in v2:
purge_status/1
]).
-include_lib("emqx/include/bpapi.hrl").
-include_lib("emqx/include/types.hrl").
introduced_in() ->
"5.2.1".
-spec local_status(node()) ->
emqx_rpc:badrpc() | disabled | {evacuation, map()} | {rebalance, map()}.
local_status(Node) ->
rpc:call(Node, emqx_node_rebalance_status, local_status, []).
-spec rebalance_status([node()]) ->
emqx_rpc:multicall_result({node(), map()}).
rebalance_status(Nodes) ->
rpc:multicall(Nodes, emqx_node_rebalance_status, rebalance_status, []).
-spec evacuation_status([node()]) ->
emqx_rpc:multicall_result({node(), map()}).
evacuation_status(Nodes) ->
rpc:multicall(Nodes, emqx_node_rebalance_status, evacuation_status, []).
%% Introduced in v2:
-spec purge_status([node()]) ->
emqx_rpc:multicall_result({node(), map()}).
purge_status(Nodes) ->
rpc:multicall(Nodes, emqx_node_rebalance_status, purge_status, []).

View File

@ -38,32 +38,35 @@ end_per_suite(_Config) ->
ok.
init_per_testcase(Case, Config) ->
[{DonorNode, _} | _] =
ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster(
[
{case_specific_node_name(?MODULE, Case, '_donor'), 2883},
{case_specific_node_name(?MODULE, Case, '_recipient'), 3883}
],
?START_APPS,
[{emqx, data_dir, case_specific_data_dir(Case, Config)}]
DonorNode = case_specific_node_name(?MODULE, Case, '_donor'),
RecipientNode = case_specific_node_name(?MODULE, Case, '_recipient'),
Spec = #{
role => core,
join_to => emqx_cth_cluster:node_name(DonorNode),
listeners => true,
apps => app_specs()
},
Cluster = [{Node, Spec} || Node <- [DonorNode, RecipientNode]],
ClusterNodes =
[Node1 | _] = emqx_cth_cluster:start(
Cluster,
#{work_dir => ?config(priv_dir, Config)}
),
ok = rpc:call(DonorNode, emqx_mgmt_api_test_util, init_suite, []),
ok = take_auth_header_from(DonorNode),
ok = rpc:call(Node1, emqx_mgmt_api_test_util, init_suite, []),
ok = take_auth_header_from(Node1),
[{cluster_nodes, ClusterNodes} | Config].
end_per_testcase(_Case, Config) ->
_ = emqx_eviction_agent_test_helpers:stop_cluster(
?config(cluster_nodes, Config),
?START_APPS
).
Nodes = ?config(cluster_nodes, Config),
erpc:multicall(Nodes, meck, unload, []),
_ = emqx_cth_cluster:stop(Nodes),
ok.
%%--------------------------------------------------------------------
%% Tests
%%--------------------------------------------------------------------
t_start_evacuation_validation(Config) ->
[{DonorNode, _}, {RecipientNode, _}] = ?config(cluster_nodes, Config),
[DonorNode, RecipientNode] = ?config(cluster_nodes, Config),
BadOpts = [
#{conn_evict_rate => <<"conn">>},
#{sess_evict_rate => <<"sess">>},
@ -117,10 +120,87 @@ t_start_evacuation_validation(Config) ->
api_get(["load_rebalance", "global_status"])
).
%% TODO: uncomment after we officially release the feature.
skipped_t_start_purge_validation(Config) ->
[Node1 | _] = ?config(cluster_nodes, Config),
Port1 = get_mqtt_port(Node1, tcp),
BadOpts = [
#{purge_rate => <<"conn">>},
#{purge_rate => 0},
#{purge_rate => -1},
#{purge_rate => 1.1},
#{unknown => <<"Value">>}
],
lists:foreach(
fun(Opts) ->
?assertMatch(
{ok, 400, #{}},
api_post(
["load_rebalance", atom_to_list(Node1), "purge", "start"],
Opts
),
Opts
)
end,
BadOpts
),
?assertMatch(
{ok, 404, #{}},
api_post(
["load_rebalance", "bad@node", "purge", "start"],
#{}
)
),
process_flag(trap_exit, true),
Conns = emqtt_connect_many(Port1, 100),
?assertMatch(
{ok, 200, #{}},
api_post(
["load_rebalance", atom_to_list(Node1), "purge", "start"],
#{purge_rate => 10}
)
),
Node1Bin = atom_to_binary(Node1),
?assertMatch(
{ok, 200, #{<<"purges">> := [#{<<"node">> := Node1Bin}]}},
api_get(["load_rebalance", "global_status"])
),
?assertMatch(
{ok, 200, #{
<<"process">> := <<"purge">>,
<<"purge_rate">> := 10,
<<"session_goal">> := 0,
<<"state">> := <<"purging">>,
<<"stats">> :=
#{
<<"current_sessions">> := _,
<<"initial_sessions">> := 100
}
}},
api_get(["load_rebalance", "status"])
),
?assertMatch(
{ok, 200, #{}},
api_post(
["load_rebalance", atom_to_list(Node1), "purge", "stop"],
#{}
)
),
ok = stop_many(Conns),
ok.
t_start_rebalance_validation(Config) ->
process_flag(trap_exit, true),
[{DonorNode, DonorPort}, {RecipientNode, _}] = ?config(cluster_nodes, Config),
[DonorNode, RecipientNode] = ?config(cluster_nodes, Config),
DonorPort = get_mqtt_port(DonorNode, tcp),
BadOpts = [
#{conn_evict_rate => <<"conn">>},
@ -189,7 +269,7 @@ t_start_rebalance_validation(Config) ->
ok = stop_many(Conns).
t_start_stop_evacuation(Config) ->
[{DonorNode, _}, {RecipientNode, _}] = ?config(cluster_nodes, Config),
[DonorNode, RecipientNode] = ?config(cluster_nodes, Config),
StartOpts = maps:merge(
maps:get(evacuation, emqx_node_rebalance_api:rebalance_evacuation_example()),
@ -284,7 +364,8 @@ t_start_stop_evacuation(Config) ->
t_start_stop_rebalance(Config) ->
process_flag(trap_exit, true),
[{DonorNode, DonorPort}, {RecipientNode, _}] = ?config(cluster_nodes, Config),
[DonorNode, RecipientNode] = ?config(cluster_nodes, Config),
DonorPort = get_mqtt_port(DonorNode, tcp),
?assertMatch(
{ok, 200, #{<<"status">> := <<"disabled">>}},
@ -390,7 +471,7 @@ t_start_stop_rebalance(Config) ->
ok = stop_many(Conns).
t_availability_check(Config) ->
[{DonorNode, _} | _] = ?config(cluster_nodes, Config),
[DonorNode | _] = ?config(cluster_nodes, Config),
?assertMatch(
{ok, 200, #{}},
api_get(["load_rebalance", "availability_check"])
@ -425,7 +506,12 @@ api_get(Path) ->
api_post(Path, Data) ->
case request(post, uri(Path), Data) of
{ok, Code, ResponseBody} ->
{ok, Code, jiffy:decode(ResponseBody, [return_maps])};
Res =
case emqx_utils_json:safe_decode(ResponseBody, [return_maps]) of
{ok, Decoded} -> Decoded;
{error, _} -> ResponseBody
end,
{ok, Code, Res};
{error, _} = Error ->
Error
end.
@ -444,3 +530,26 @@ case_specific_data_dir(Case, Config) ->
undefined -> undefined;
PrivDir -> filename:join(PrivDir, atom_to_list(Case))
end.
app_specs() ->
[
{emqx, #{
before_start => fun() ->
emqx_app:set_config_loader(?MODULE)
end,
override_env => [{boot_modules, [broker, listeners]}]
}},
{emqx_retainer, #{
config =>
#{
retainer =>
#{enable => true}
}
}},
emqx_eviction_agent,
emqx_node_rebalance
].
get_mqtt_port(Node, Type) ->
{_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
Port.

View File

@ -156,6 +156,80 @@ t_evacuation(_Config) ->
emqx_node_rebalance_evacuation:status()
).
t_purge(_Config) ->
%% start with invalid args
?assertNot(
emqx_node_rebalance_cli:cli(["start", "--purge", "--foo-bar"])
),
?assertNot(
emqx_node_rebalance_cli:cli(["start", "--purge", "--purge-rate", "foobar"])
),
%% not used by this scenario
?assertNot(
emqx_node_rebalance_cli:cli(["start", "--purge", "--conn-evict-rate", "1"])
),
?assertNot(
emqx_node_rebalance_cli:cli(["start", "--purge", "--sess-evict-rate", "1"])
),
?assertNot(
emqx_node_rebalance_cli:cli(["start", "--purge", "--wait-takeover", "1"])
),
?assertNot(
emqx_node_rebalance_cli:cli([
"start",
"--purge",
"--migrate-to",
atom_to_list(node())
])
),
with_some_sessions(fun() ->
?assert(
emqx_node_rebalance_cli:cli([
"start",
"--purge",
"--purge-rate",
"10"
])
),
%% status
ok = emqx_node_rebalance_cli:cli(["status"]),
ok = emqx_node_rebalance_cli:cli(["node-status"]),
ok = emqx_node_rebalance_cli:cli(["node-status", atom_to_list(node())]),
?assertMatch(
{enabled, #{}},
emqx_node_rebalance_purge:status()
),
%% already enabled
?assertNot(
emqx_node_rebalance_cli:cli([
"start",
"--purge",
"--purge-rate",
"10"
])
),
true = emqx_node_rebalance_cli:cli(["stop"]),
ok
end),
%% stop
false = emqx_node_rebalance_cli:cli(["stop"]),
?assertEqual(
disabled,
emqx_node_rebalance_purge:status()
),
ok.
t_rebalance(Config) ->
process_flag(trap_exit, true),
@ -289,3 +363,12 @@ emqx_node_rebalance_cli(Node, Args) ->
Result ->
Result
end.
%% to avoid it finishing too fast
with_some_sessions(Fn) ->
emqx_common_test_helpers:with_mock(
emqx_eviction_agent,
all_channels_count,
fun() -> 100 end,
Fn
).

View File

@ -0,0 +1,360 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_node_rebalance_purge_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-import(
emqx_eviction_agent_test_helpers,
[
emqtt_connect/1,
emqtt_try_connect/1,
case_specific_node_name/3
]
).
all() ->
[{group, one_node}, {group, two_nodes}].
groups() ->
[
{one_node, [], one_node_cases()},
{two_nodes, [], two_nodes_cases()}
].
two_nodes_cases() ->
[
t_already_started_two,
t_session_purged
].
one_node_cases() ->
emqx_common_test_helpers:all(?MODULE) -- two_nodes_cases().
init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([]),
ok.
init_per_group(one_node, Config) ->
[{cluster_type, one_node} | Config];
init_per_group(two_nodes, Config) ->
[{cluster_type, two_nodes} | Config].
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(TestCase, Config) ->
ct:timetrap({seconds, 30}),
Nodes =
[Node1 | _] =
case ?config(cluster_type, Config) of
one_node ->
[case_specific_node_name(?MODULE, TestCase, '_1')];
two_nodes ->
[
case_specific_node_name(?MODULE, TestCase, '_1'),
case_specific_node_name(?MODULE, TestCase, '_2')
]
end,
Spec = #{
role => core,
join_to => emqx_cth_cluster:node_name(Node1),
listeners => true,
apps => app_specs()
},
Cluster = [{Node, Spec} || Node <- Nodes],
ClusterNodes = emqx_cth_cluster:start(
Cluster,
#{work_dir => ?config(priv_dir, Config)}
),
ok = snabbkaffe:start_trace(),
[{cluster_nodes, ClusterNodes} | Config].
end_per_testcase(_TestCase, Config) ->
Nodes = ?config(cluster_nodes, Config),
ok = snabbkaffe:stop(),
erpc:multicall(Nodes, meck, unload, []),
ok = emqx_cth_cluster:stop(Nodes),
ok.
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
app_specs() ->
[
{emqx, #{
before_start => fun() ->
emqx_app:set_config_loader(?MODULE)
end,
override_env => [{boot_modules, [broker, listeners]}]
}},
{emqx_retainer, #{
config =>
#{
retainer =>
#{enable => true}
}
}},
{emqx_modules, #{
config =>
#{delayed => #{enable => true}}
}},
emqx_eviction_agent,
emqx_node_rebalance
].
opts(_Config) ->
#{
purge_rate => 10
}.
case_specific_data_dir(Case, Config) ->
case ?config(priv_dir, Config) of
undefined -> undefined;
PrivDir -> filename:join(PrivDir, atom_to_list(Case))
end.
get_mqtt_port(Node, Type) ->
{_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
Port.
%% to avoid it finishing too fast
with_some_sessions(Node, Fn) ->
erpc:call(Node, fun() ->
emqx_common_test_helpers:with_mock(
emqx_eviction_agent,
all_channels_count,
fun() -> 100 end,
Fn
)
end).
drain_exits([ClientPid | Rest]) ->
receive
{'EXIT', ClientPid, _Reason} ->
drain_exits(Rest)
after 1_000 ->
ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
ct:fail("pid ~p didn't die", [ClientPid])
end;
drain_exits([]) ->
ok.
emqtt_connect_many(Port, Count) ->
emqtt_connect_many(Port, Count, _StartN = 1).
%% start many clients with mixed clean_start flags
emqtt_connect_many(Port, Count, StartN) ->
lists:map(
fun(N) ->
NBin = integer_to_binary(N),
ClientId = <<"client-", NBin/binary>>,
CleanStart = N rem 2 == 0,
{ok, C} = emqtt_connect([{clientid, ClientId}, {clean_start, CleanStart}, {port, Port}]),
C
end,
lists:seq(StartN, StartN + Count - 1)
).
%%--------------------------------------------------------------------
%% Test Cases : one node
%%--------------------------------------------------------------------
t_agent_busy(Config) ->
[Node] = ?config(cluster_nodes, Config),
ok = rpc:call(Node, emqx_eviction_agent, enable, [other_rebalance, undefined]),
erpc:call(Node, fun() ->
?assertExit(
{{{bad_nodes, [{Node, {error, eviction_agent_busy}}]}, _}, _},
emqx_node_rebalance_purge:start(opts(Config))
)
end),
ok.
t_already_started(Config) ->
[Node] = ?config(cluster_nodes, Config),
with_some_sessions(Node, fun() ->
ok = emqx_node_rebalance_purge:start(opts(Config)),
?assertEqual(
{error, already_started},
emqx_node_rebalance_purge:start(opts(Config))
),
?assertEqual(
ok,
emqx_node_rebalance_purge:stop()
),
ok
end),
ok.
t_not_started(Config) ->
[Node] = ?config(cluster_nodes, Config),
?assertEqual(
{error, not_started},
rpc:call(Node, emqx_node_rebalance_purge, stop, [])
).
t_start(Config) ->
[Node] = ?config(cluster_nodes, Config),
Port = get_mqtt_port(Node, tcp),
with_some_sessions(Node, fun() ->
process_flag(trap_exit, true),
ok = snabbkaffe:start_trace(),
?assertEqual(
ok,
emqx_node_rebalance_purge:start(opts(Config))
),
?assertEqual({error, {use_another_server, #{}}}, emqtt_try_connect([{port, Port}])),
ok
end),
ok.
t_non_persistence(Config) ->
[Node] = ?config(cluster_nodes, Config),
Port = get_mqtt_port(Node, tcp),
%% to avoid it finishing too fast
with_some_sessions(Node, fun() ->
process_flag(trap_exit, true),
ok = snabbkaffe:start_trace(),
?assertEqual(
ok,
emqx_node_rebalance_purge:start(opts(Config))
),
?assertMatch(
{error, {use_another_server, #{}}},
emqtt_try_connect([{port, Port}])
),
ok = supervisor:terminate_child(emqx_node_rebalance_sup, emqx_node_rebalance_purge),
{ok, _} = supervisor:restart_child(emqx_node_rebalance_sup, emqx_node_rebalance_purge),
?assertMatch(
ok,
emqtt_try_connect([{port, Port}])
),
?assertMatch(disabled, emqx_node_rebalance_purge:status()),
ok
end),
ok.
t_unknown_messages(Config) ->
process_flag(trap_exit, true),
[Node] = ?config(cluster_nodes, Config),
ok = rpc:call(Node, emqx_node_rebalance_purge, start, [opts(Config)]),
Pid = rpc:call(Node, erlang, whereis, [emqx_node_rebalance_purge]),
Pid ! unknown,
ok = gen_server:cast(Pid, unknown),
?assertEqual(
ignored,
gen_server:call(Pid, unknown)
),
ok.
%%--------------------------------------------------------------------
%% Test Cases : two nodes
%%--------------------------------------------------------------------
t_already_started_two(Config) ->
[Node1, _Node2] = ?config(cluster_nodes, Config),
with_some_sessions(Node1, fun() ->
ok = emqx_node_rebalance_purge:start(opts(Config)),
?assertEqual(
{error, already_started},
emqx_node_rebalance_purge:start(opts(Config))
),
?assertEqual(
ok,
emqx_node_rebalance_purge:stop()
),
ok
end),
?assertEqual(
{error, not_started},
rpc:call(Node1, emqx_node_rebalance_purge, stop, [])
),
ok.
t_session_purged(Config) ->
process_flag(trap_exit, true),
[Node1, Node2] = ?config(cluster_nodes, Config),
Port1 = get_mqtt_port(Node1, tcp),
Port2 = get_mqtt_port(Node2, tcp),
%% N.B.: it's important to have an asymmetric number of clients for this test, as
%% otherwise the scenario might happen to finish successfully due to the wrong
%% reasons!
NumClientsNode1 = 5,
NumClientsNode2 = 35,
Node1Clients = emqtt_connect_many(Port1, NumClientsNode1, _StartN1 = 1),
Node2Clients = emqtt_connect_many(Port2, NumClientsNode2, _StartN2 = 21),
lists:foreach(
fun(C) ->
ClientId = proplists:get_value(clientid, emqtt:info(C)),
Topic = emqx_topic:join([<<"t">>, ClientId]),
Props = #{},
Payload = ClientId,
Opts = [{retain, true}],
ok = emqtt:publish(C, Topic, Props, Payload, Opts),
DelayedTopic = emqx_topic:join([<<"$delayed/120">>, Topic]),
ok = emqtt:publish(C, DelayedTopic, Payload),
{ok, _, [?RC_GRANTED_QOS_0]} = emqtt:subscribe(C, Topic),
ok
end,
Node1Clients ++ Node2Clients
),
?assertEqual(40, erpc:call(Node2, emqx_retainer, retained_count, [])),
?assertEqual(NumClientsNode1, erpc:call(Node1, emqx_delayed, delayed_count, [])),
?assertEqual(NumClientsNode2, erpc:call(Node2, emqx_delayed, delayed_count, [])),
{ok, SRef0} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := "cluster_purge_done"}),
15_000
),
ok = rpc:call(Node1, emqx_node_rebalance_purge, start, [opts(Config)]),
{ok, _} = snabbkaffe:receive_events(SRef0),
?assertEqual([], erpc:call(Node1, emqx_cm, all_channels, [])),
?assertEqual([], erpc:call(Node2, emqx_cm, all_channels, [])),
?assertEqual(0, erpc:call(Node1, emqx_retainer, retained_count, [])),
?assertEqual(0, erpc:call(Node2, emqx_retainer, retained_count, [])),
?assertEqual(0, erpc:call(Node1, emqx_delayed, delayed_count, [])),
?assertEqual(0, erpc:call(Node2, emqx_delayed, delayed_count, [])),
ok = drain_exits(Node1Clients ++ Node2Clients),
ok.

View File

@ -57,7 +57,7 @@ end_per_suite(Config) ->
t_cluster_status(Config) ->
[CoreNode, ReplicantNode] = ?config(cluster_nodes, Config),
ok = emqx_node_rebalance_api_proto_v1:node_rebalance_evacuation_start(CoreNode, #{}),
ok = emqx_node_rebalance_api_proto_v2:node_rebalance_evacuation_start(CoreNode, #{}),
?assertMatch(
#{evacuations := [_], rebalances := []},

View File

@ -0,0 +1 @@
Added CLI command to wipe session and retained message data on the whole cluster.

View File

@ -42,6 +42,18 @@ load_rebalance_evacuation_stop.desc:
load_rebalance_evacuation_stop.label:
"""Stop evacuation"""
cluster_purge_start.desc:
"""Start purge process"""
cluster_purge_start.label:
"""Start purge"""
cluster_purge_stop.desc:
"""Stop purge process"""
cluster_purge_stop.label:
"""Stop purge"""
param_node.desc:
"""Node name"""
@ -150,6 +162,12 @@ local_status_session_eviction_rate.desc:
local_status_session_eviction_rate.label:
"""Session eviction rate"""
local_status_purge_rate.desc:
"""The rate of purging sessions, in sessions per second"""
local_status_purge_rate.label:
"""Session purge rate"""
local_status_connection_goal.desc:
"""The number of connections that the node should have after the rebalance/evacuation process"""