From e93e9ed1081737874b8ec899187341261184ba88 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Fri, 17 Nov 2023 12:32:16 +0300 Subject: [PATCH] feat(rebalance): improve rebalance usability * make availability API endpoint public * allow connections during wait_health_check interval * make availability status calculation more consistent and lightweight * refactor test to get rid of some mocks and to use cth --- apps/emqx/priv/bpapi.versions | 1 + .../src/emqx_eviction_agent.app.src | 2 +- .../src/emqx_eviction_agent.erl | 64 +++++-- .../test/emqx_eviction_agent_SUITE.erl | 72 ++++++- .../test/emqx_eviction_agent_api_SUITE.erl | 19 +- .../emqx_eviction_agent_channel_SUITE.erl | 18 +- .../test/emqx_eviction_agent_cli_SUITE.erl | 16 +- .../test/emqx_eviction_agent_test_helpers.erl | 80 ++++---- .../src/emqx_node_rebalance.app.src | 5 +- .../src/emqx_node_rebalance.erl | 16 +- .../src/emqx_node_rebalance_agent.erl | 175 +++++++++++------- .../src/emqx_node_rebalance_api.erl | 9 +- .../src/emqx_node_rebalance_evacuation.erl | 49 +++-- .../src/emqx_node_rebalance_purge.erl | 2 +- .../src/emqx_node_rebalance_status.erl | 8 + .../proto/emqx_node_rebalance_proto_v3.erl | 96 ++++++++++ .../test/emqx_node_rebalance_SUITE.erl | 79 ++++++-- .../test/emqx_node_rebalance_agent_SUITE.erl | 38 ++-- .../test/emqx_node_rebalance_api_SUITE.erl | 30 +-- .../test/emqx_node_rebalance_cli_SUITE.erl | 114 ++++++------ .../emqx_node_rebalance_evacuation_SUITE.erl | 53 +++--- .../test/emqx_node_rebalance_purge_SUITE.erl | 36 ++-- .../test/emqx_node_rebalance_status_SUITE.erl | 1 + changes/ee/feat-11971.en.md | 4 + 24 files changed, 657 insertions(+), 330 deletions(-) create mode 100644 apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_proto_v3.erl create mode 100644 changes/ee/feat-11971.en.md diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 7042f5186..ea4ce159d 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -43,6 +43,7 @@ {emqx_mgmt_trace,2}. {emqx_node_rebalance,1}. {emqx_node_rebalance,2}. +{emqx_node_rebalance,3}. {emqx_node_rebalance_api,1}. {emqx_node_rebalance_api,2}. {emqx_node_rebalance_evacuation,1}. diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src b/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src index 4f4cf5722..cc415d495 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src @@ -1,6 +1,6 @@ {application, emqx_eviction_agent, [ {description, "EMQX Eviction Agent"}, - {vsn, "5.1.4"}, + {vsn, "5.1.5"}, {registered, [ emqx_eviction_agent_sup, emqx_eviction_agent, diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl b/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl index 42cffcb3d..9f1352b7c 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl @@ -15,8 +15,11 @@ -export([ start_link/0, enable/2, + enable/3, + default_options/0, disable/1, status/0, + enable_status/0, connection_count/0, all_channels_count/0, session_count/0, @@ -51,7 +54,7 @@ unhook/0 ]). --export_type([server_reference/0]). +-export_type([server_reference/0, kind/0, options/0]). -define(CONN_MODULES, [ emqx_connection, emqx_ws_connection, emqx_quic_connection, emqx_eviction_agent_channel @@ -67,15 +70,31 @@ connections := non_neg_integer(), sessions := non_neg_integer() }. --type kind() :: atom(). + +%% kind() is any() because it was not exported previously +%% and bpapi checker remembered it as any() +-type kind() :: any(). +-type options() :: #{ + allow_connections => boolean() +}. -spec start_link() -> startlink_ret(). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +-spec default_options() -> options(). +default_options() -> + #{ + allow_connections => false + }. + -spec enable(kind(), server_reference()) -> ok_or_error(eviction_agent_busy). enable(Kind, ServerReference) -> - gen_server:call(?MODULE, {enable, Kind, ServerReference}). + gen_server:call(?MODULE, {enable, Kind, ServerReference, default_options()}). + +-spec enable(kind(), server_reference(), options()) -> ok_or_error(eviction_agent_busy). +enable(Kind, ServerReference, #{} = Options) -> + gen_server:call(?MODULE, {enable, Kind, ServerReference, Options}). -spec disable(kind()) -> ok. disable(Kind) -> @@ -84,16 +103,20 @@ disable(Kind) -> -spec status() -> status(). status() -> case enable_status() of - {enabled, _Kind, _ServerReference} -> + {enabled, _Kind, _ServerReference, _Options} -> {enabled, stats()}; disabled -> disabled end. +-spec enable_status() -> disabled | {enabled, kind(), server_reference(), options()}. +enable_status() -> + persistent_term:get(?MODULE, disabled). + -spec evict_connections(pos_integer()) -> ok_or_error(disabled). evict_connections(N) -> case enable_status() of - {enabled, _Kind, ServerReference} -> + {enabled, _Kind, ServerReference, _Options} -> ok = do_evict_connections(N, ServerReference); disabled -> {error, disabled} @@ -112,15 +135,16 @@ evict_sessions(N, Nodes, ConnState) when is_list(Nodes) andalso length(Nodes) > 0 -> case enable_status() of - {enabled, _Kind, _ServerReference} -> + {enabled, _Kind, _ServerReference, _Options} -> ok = do_evict_sessions(N, Nodes, ConnState); disabled -> {error, disabled} end. +-spec purge_sessions(non_neg_integer()) -> ok_or_error(disabled). purge_sessions(N) -> case enable_status() of - {enabled, _Kind, _ServerReference} -> + {enabled, _Kind, _ServerReference, _Options} -> ok = do_purge_sessions(N); disabled -> {error, disabled} @@ -135,14 +159,14 @@ init([]) -> {ok, #{}}. %% enable -handle_call({enable, Kind, ServerReference}, _From, St) -> +handle_call({enable, Kind, ServerReference, Options}, _From, St) -> Reply = case enable_status() of disabled -> - ok = persistent_term:put(?MODULE, {enabled, Kind, ServerReference}); - {enabled, Kind, _ServerReference} -> - ok = persistent_term:put(?MODULE, {enabled, Kind, ServerReference}); - {enabled, _OtherKind, _ServerReference} -> + ok = persistent_term:put(?MODULE, {enabled, Kind, ServerReference, Options}); + {enabled, Kind, _ServerReference, _Options} -> + ok = persistent_term:put(?MODULE, {enabled, Kind, ServerReference, Options}); + {enabled, _OtherKind, _ServerReference, _Options} -> {error, eviction_agent_busy} end, {reply, Reply, St}; @@ -152,10 +176,10 @@ handle_call({disable, Kind}, _From, St) -> case enable_status() of disabled -> {error, disabled}; - {enabled, Kind, _ServerReference} -> + {enabled, Kind, _ServerReference, _Options} -> _ = persistent_term:erase(?MODULE), ok; - {enabled, _OtherKind, _ServerReference} -> + {enabled, _OtherKind, _ServerReference, _Options} -> {error, eviction_agent_busy} end, {reply, Reply, St}; @@ -180,8 +204,10 @@ code_change(_Vsn, State, _Extra) -> on_connect(_ConnInfo, _Props) -> case enable_status() of - {enabled, _Kind, _ServerReference} -> + {enabled, _Kind, _ServerReference, #{allow_connections := false}} -> {stop, {error, ?RC_USE_ANOTHER_SERVER}}; + {enabled, _Kind, _ServerReference, _Options} -> + ignore; disabled -> ignore end. @@ -192,7 +218,7 @@ on_connack( Props ) -> case enable_status() of - {enabled, _Kind, ServerReference} -> + {enabled, _Kind, ServerReference, _Options} -> {ok, Props#{'Server-Reference' => ServerReference}}; disabled -> {ok, Props} @@ -214,10 +240,10 @@ unhook() -> ok = emqx_hooks:del('client.connect', {?MODULE, on_connect}), ok = emqx_hooks:del('client.connack', {?MODULE, on_connack}). -enable_status() -> - persistent_term:get(?MODULE, disabled). +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- -% connection management stats() -> #{ connections => connection_count(), diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl index bc6f626d2..bf2865a78 100644 --- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl +++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl @@ -15,7 +15,11 @@ -import( emqx_eviction_agent_test_helpers, - [emqtt_connect/0, emqtt_connect/1, emqtt_connect/2, emqtt_connect_for_publish/1] + [ + emqtt_connect/0, emqtt_connect/1, emqtt_connect/2, + emqtt_connect_for_publish/1, + case_specific_node_name/1 + ] ). -define(assertPrinted(Printed, Code), @@ -29,11 +33,19 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_common_test_helpers:start_apps([emqx_eviction_agent]), - Config. + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_eviction_agent + ], + #{ + work_dir => emqx_cth_suite:work_dir(Config) + } + ), + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_eviction_agent]). +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)). init_per_testcase(Case, Config) -> _ = emqx_eviction_agent:disable(test_eviction), @@ -41,10 +53,17 @@ init_per_testcase(Case, Config) -> start_slave(Case, Config). start_slave(t_explicit_session_takeover, Config) -> + NodeNames = + [ + t_explicit_session_takeover_donor, + t_explicit_session_takeover_recipient + ], ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster( - [{evacuate_test1, 2883}, {evacuate_test2, 3883}], - [emqx_eviction_agent] + Config, + NodeNames, + [emqx_conf, emqx, emqx_eviction_agent] ), + ok = snabbkaffe:start_trace(), [{evacuate_nodes, ClusterNodes} | Config]; start_slave(_Case, Config) -> Config. @@ -56,8 +75,7 @@ end_per_testcase(TestCase, Config) -> stop_slave(t_explicit_session_takeover, Config) -> emqx_eviction_agent_test_helpers:stop_cluster( - ?config(evacuate_nodes, Config), - [emqx_eviction_agent] + ?config(evacuate_nodes, Config) ); stop_slave(_Case, _Config) -> ok. @@ -77,13 +95,16 @@ t_enable_disable(_Config) -> {ok, C0} = emqtt_connect(), ok = emqtt:disconnect(C0), + %% Enable ok = emqx_eviction_agent:enable(test_eviction, undefined), + %% Can't enable with different kind ?assertMatch( {error, eviction_agent_busy}, emqx_eviction_agent:enable(bar, undefined) ), + %% Enable with the same kind but different server ref ?assertMatch( ok, emqx_eviction_agent:enable(test_eviction, <<"srv">>) @@ -99,6 +120,39 @@ t_enable_disable(_Config) -> emqtt_connect() ), + %% Enable with the same kind and server ref and explicit options + ?assertMatch( + ok, + emqx_eviction_agent:enable(test_eviction, <<"srv">>, #{allow_connections => false}) + ), + + ?assertMatch( + {enabled, #{}}, + emqx_eviction_agent:status() + ), + + ?assertMatch( + {error, {use_another_server, #{}}}, + emqtt_connect() + ), + + %% Enable with the same kind and server ref and permissive options + ?assertMatch( + ok, + emqx_eviction_agent:enable(test_eviction, <<"srv">>, #{allow_connections => true}) + ), + + ?assertMatch( + {enabled, #{}}, + emqx_eviction_agent:status() + ), + + ?assertMatch( + {ok, _}, + emqtt_connect() + ), + + %% Can't enable using different kind ?assertMatch( {error, eviction_agent_busy}, emqx_eviction_agent:disable(bar) diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_api_SUITE.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_api_SUITE.erl index 3fe15e53a..341f543a7 100644 --- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_api_SUITE.erl +++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_api_SUITE.erl @@ -22,12 +22,23 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_mgmt_api_test_util:init_suite([emqx_eviction_agent]), - Config. + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_eviction_agent, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{ + work_dir => emqx_cth_suite:work_dir(Config) + } + ), + _ = emqx_common_test_http:create_default_app(), + [{apps, Apps} | Config]. end_per_suite(Config) -> - emqx_mgmt_api_test_util:end_suite([emqx_eviction_agent]), - Config. + emqx_common_test_http:delete_default_app(), + emqx_cth_suite:stop(?config(apps, Config)). %%-------------------------------------------------------------------- %% Tests diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl index b4d7ceb08..d87429339 100644 --- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl +++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl @@ -22,12 +22,20 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_common_test_helpers:start_apps([emqx_conf, emqx_eviction_agent]), - {ok, _} = emqx:update_config([rpc, port_discovery], manual), - Config. + Apps = emqx_cth_suite:start( + [ + emqx_conf, + emqx, + emqx_eviction_agent + ], + #{ + work_dir => emqx_cth_suite:work_dir(Config) + } + ), + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_eviction_agent, emqx_conf]). +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)). %%-------------------------------------------------------------------- %% Tests diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_cli_SUITE.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_cli_SUITE.erl index 4cfb2fff5..70abd076f 100644 --- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_cli_SUITE.erl +++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_cli_SUITE.erl @@ -14,13 +14,21 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_common_test_helpers:start_apps([emqx_eviction_agent]), - Config. + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_eviction_agent + ], + #{ + work_dir => emqx_cth_suite:work_dir(Config) + } + ), + [{apps, Apps} | Config]. end_per_suite(Config) -> _ = emqx_eviction_agent:disable(foo), - emqx_common_test_helpers:stop_apps([emqx_eviction_agent]), - Config. + + emqx_cth_suite:stop(?config(apps, Config)). %%-------------------------------------------------------------------- %% Tests diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl index b3b3e8767..052f37952 100644 --- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl +++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl @@ -15,13 +15,15 @@ emqtt_try_connect/1, - start_cluster/2, start_cluster/3, - stop_cluster/2, + stop_cluster/1, case_specific_node_name/2, case_specific_node_name/3, - concat_atoms/1 + concat_atoms/1, + + get_mqtt_port/2, + nodes_with_mqtt_tcp_ports/1 ]). emqtt_connect() -> @@ -83,52 +85,24 @@ emqtt_try_connect(Opts) -> Error end. -start_cluster(NamesWithPorts, Apps) -> - start_cluster(NamesWithPorts, Apps, []). - -start_cluster(NamesWithPorts, Apps, Env) -> - Specs = lists:map( - fun({ShortName, Port}) -> - {core, ShortName, #{listener_ports => [{tcp, Port}]}} - end, - NamesWithPorts +start_cluster(Config, NodeNames = [Node1 | _], Apps) -> + Spec = #{ + role => core, + join_to => emqx_cth_cluster:node_name(Node1), + listeners => true, + apps => Apps + }, + Cluster = [{NodeName, Spec} || NodeName <- NodeNames], + ClusterNodes = emqx_cth_cluster:start( + Cluster, + %% Use Node1 to scope the work dirs for all the nodes + #{work_dir => emqx_cth_suite:work_dir(Node1, Config)} ), - Opts0 = [ - {env, Env}, - {apps, Apps}, - {conf, - [{[listeners, Proto, default, enable], false} || Proto <- [ssl, ws, wss]] ++ - [{[rpc, mode], async}]} - ], - Cluster = emqx_common_test_helpers:emqx_cluster( - Specs, - Opts0 - ), - NodesWithPorts = [ - { - emqx_common_test_helpers:start_slave(Name, Opts), - proplists:get_value(Name, NamesWithPorts) - } - || {Name, Opts} <- Cluster - ], - NodesWithPorts. + nodes_with_mqtt_tcp_ports(ClusterNodes). -stop_cluster(NodesWithPorts, Apps) -> - lists:foreach( - fun({Node, _Port}) -> - lists:foreach( - fun(App) -> - rpc:call(Node, application, stop, [App]) - end, - Apps - ), - %% This sleep is just to make logs cleaner - ct:sleep(100), - _ = rpc:call(Node, emqx_common_test_helpers, stop_apps, []), - emqx_common_test_helpers:stop_slave(Node) - end, - NodesWithPorts - ). +stop_cluster(NamesWithPorts) -> + {Nodes, _Ports} = lists:unzip(NamesWithPorts), + ok = emqx_cth_cluster:stop(Nodes). case_specific_node_name(Module, Case) -> concat_atoms([Module, '__', Case]). @@ -145,3 +119,15 @@ concat_atoms(Atoms) -> ) ) ). + +get_mqtt_port(Node, Type) -> + {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), + Port. + +nodes_with_mqtt_tcp_ports(Nodes) -> + lists:map( + fun(Node) -> + {Node, get_mqtt_port(Node, tcp)} + end, + Nodes + ). diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src b/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src index c66ec9f23..beb5f2abb 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src @@ -1,11 +1,12 @@ {application, emqx_node_rebalance, [ {description, "EMQX Node Rebalance"}, - {vsn, "5.0.6"}, + {vsn, "5.0.7"}, {registered, [ emqx_node_rebalance_sup, emqx_node_rebalance, emqx_node_rebalance_agent, - emqx_node_rebalance_evacuation + emqx_node_rebalance_evacuation, + emqx_node_rebalance_purge ]}, {applications, [ kernel, diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance.erl index b2044c5fa..f9f9fc70e 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance.erl @@ -41,6 +41,8 @@ start_error/0 ]). +-define(ENABLE_KIND, ?MODULE). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -143,9 +145,13 @@ handle_event( state_timeout, evict_conns, wait_health_check, - Data + #{donors := DonorNodes} = Data ) -> ?SLOG(warning, #{msg => "node_rebalance_wait_health_check_over"}), + _ = multicall(DonorNodes, enable_rebalance_agent, [ + self(), ?ENABLE_KIND, #{allow_connections => false} + ]), + ?tp(debug, node_rebalance_enable_started_prohibiting, #{}), {next_state, evicting_conns, Data, [{state_timeout, 0, evict_conns}]}; handle_event( state_timeout, @@ -232,7 +238,9 @@ enable_rebalance(#{opts := Opts} = Data) -> false -> {error, nothing_to_balance}; true -> - _ = multicall(DonorNodes, enable_rebalance_agent, [self()]), + _ = multicall(DonorNodes, enable_rebalance_agent, [ + self(), ?ENABLE_KIND, #{allow_connections => true} + ]), {ok, Data#{ donors => DonorNodes, recipients => RecipientNodes, @@ -242,7 +250,7 @@ enable_rebalance(#{opts := Opts} = Data) -> end. disable_rebalance(#{donors := DonorNodes}) -> - _ = multicall(DonorNodes, disable_rebalance_agent, [self()]), + _ = multicall(DonorNodes, disable_rebalance_agent, [self(), ?ENABLE_KIND]), ok. evict_conns(#{donors := DonorNodes, recipients := RecipientNodes, opts := Opts} = Data) -> @@ -370,7 +378,7 @@ avg(List) when length(List) >= 1 -> lists:sum(List) / length(List). multicall(Nodes, F, A) -> - case apply(emqx_node_rebalance_proto_v2, F, [Nodes | A]) of + case apply(emqx_node_rebalance_proto_v3, F, [Nodes | A]) of {Results, []} -> case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of {OkResults, []} -> diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_agent.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_agent.erl index 250d03d9c..088d27b6b 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_agent.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_agent.erl @@ -11,10 +11,13 @@ -include_lib("stdlib/include/qlc.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-behaviour(gen_statem). + -export([ start_link/0, enable/1, enable/2, + enable/3, disable/1, disable/2, status/0 @@ -22,13 +25,13 @@ -export([ init/1, - handle_call/3, - handle_info/2, - handle_cast/2, - code_change/3 + callback_mode/0, + handle_event/4, + code_change/4 ]). -define(ENABLE_KIND, emqx_node_rebalance). +-define(SERVER_REFERENCE, undefined). %%-------------------------------------------------------------------- %% APIs @@ -38,16 +41,21 @@ -spec start_link() -> startlink_ret(). start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []). -spec enable(pid()) -> ok_or_error(already_enabled | eviction_agent_busy). enable(CoordinatorPid) -> enable(CoordinatorPid, ?ENABLE_KIND). -spec enable(pid(), emqx_eviction_agent:kind()) -> - ok_or_error(already_enabled | eviction_agent_busy). + ok_or_error(invalid_coordinator | eviction_agent_busy). enable(CoordinatorPid, Kind) -> - gen_server:call(?MODULE, {enable, CoordinatorPid, Kind}). + enable(CoordinatorPid, Kind, emqx_eviction_agent:default_options()). + +-spec enable(pid(), emqx_eviction_agent:kind(), emqx_eviction_agent:options()) -> + ok_or_error(invalid_coordinator | eviction_agent_busy). +enable(CoordinatorPid, Kind, Options) -> + gen_statem:call(?MODULE, {enable, CoordinatorPid, Kind, Options}). -spec disable(pid()) -> ok_or_error(already_disabled | invalid_coordinator). disable(CoordinatorPid) -> @@ -56,88 +64,113 @@ disable(CoordinatorPid) -> -spec disable(pid(), emqx_eviction_agent:kind()) -> ok_or_error(already_disabled | invalid_coordinator). disable(CoordinatorPid, Kind) -> - gen_server:call(?MODULE, {disable, CoordinatorPid, Kind}). + gen_statem:call(?MODULE, {disable, CoordinatorPid, Kind}). -spec status() -> status(). status() -> - gen_server:call(?MODULE, status). + gen_statem:call(?MODULE, status). %%-------------------------------------------------------------------- -%% gen_server callbacks +%% gen_statem callbacks %%-------------------------------------------------------------------- +-define(disabled, disabled). +-define(enabled(ST), {enabled, ST}). + +callback_mode() -> + handle_event_function. + init([]) -> - {ok, #{}}. + {ok, ?disabled, #{}}. -handle_call({enable, CoordinatorPid, Kind}, _From, St) -> - case St of - #{coordinator_pid := _Pid} -> - {reply, {error, already_enabled}, St}; - _ -> - true = link(CoordinatorPid), - EvictionAgentPid = whereis(emqx_eviction_agent), - true = link(EvictionAgentPid), - case emqx_eviction_agent:enable(Kind, undefined) of - ok -> - {reply, ok, #{ - coordinator_pid => CoordinatorPid, - eviction_agent_pid => EvictionAgentPid - }}; - {error, eviction_agent_busy} -> - true = unlink(EvictionAgentPid), - true = unlink(CoordinatorPid), - {reply, {error, eviction_agent_busy}, St} - end - end; -handle_call({disable, CoordinatorPid, Kind}, _From, St) -> - case St of - #{ - coordinator_pid := CoordinatorPid, - eviction_agent_pid := EvictionAgentPid - } -> - _ = emqx_eviction_agent:disable(Kind), +%% disabled status + +%% disabled status, enable command +handle_event({call, From}, {enable, CoordinatorPid, Kind, Options}, ?disabled, Data) -> + true = link(CoordinatorPid), + EvictionAgentPid = whereis(emqx_eviction_agent), + true = link(EvictionAgentPid), + case emqx_eviction_agent:enable(Kind, ?SERVER_REFERENCE, Options) of + ok -> + {next_state, + ?enabled(#{ + coordinator_pid => CoordinatorPid, + eviction_agent_pid => EvictionAgentPid, + kind => Kind + }), Data, {reply, From, ok}}; + {error, eviction_agent_busy} -> true = unlink(EvictionAgentPid), true = unlink(CoordinatorPid), - NewSt = maps:without( - [coordinator_pid, eviction_agent_pid], - St - ), - {reply, ok, NewSt}; - #{coordinator_pid := _CoordinatorPid} -> - {reply, {error, invalid_coordinator}, St}; - #{} -> - {reply, {error, already_disabled}, St} + {keep_state_and_data, {reply, From, {error, eviction_agent_busy}}} end; -handle_call(status, _From, St) -> - case St of - #{coordinator_pid := Pid} -> - {reply, {enabled, Pid}, St}; - _ -> - {reply, disabled, St} - end; -handle_call(Msg, _From, St) -> +%% disabled status, disable command +handle_event({call, From}, {disable, _CoordinatorPid, _Kind}, ?disabled, _Data) -> + {keep_state_and_data, {reply, From, {error, already_disabled}}}; +%% disabled status, status command +handle_event({call, From}, status, ?disabled, _Data) -> + {keep_state_and_data, {reply, From, disabled}}; +%% enabled status + +%% enabled status, enable command +handle_event( + {call, From}, + {enable, CoordinatorPid, Kind, Options}, + ?enabled(#{ + coordinator_pid := CoordinatorPid, + kind := Kind + }), + _Data +) -> + %% just updating options + ok = emqx_eviction_agent:enable(Kind, ?SERVER_REFERENCE, Options), + {keep_state_and_data, {reply, From, ok}}; +handle_event({call, From}, {enable, _CoordinatorPid, _Kind, _Options}, ?enabled(_St), _Data) -> + {keep_state_and_data, {reply, From, {error, invalid_coordinator}}}; +%% enabled status, disable command +handle_event( + {call, From}, + {disable, CoordinatorPid, Kind}, + ?enabled(#{ + coordinator_pid := CoordinatorPid, + eviction_agent_pid := EvictionAgentPid + }), + Data +) -> + _ = emqx_eviction_agent:disable(Kind), + true = unlink(EvictionAgentPid), + true = unlink(CoordinatorPid), + {next_state, ?disabled, Data, {reply, From, ok}}; +handle_event({call, From}, {disable, _CoordinatorPid, _Kind}, ?enabled(_St), _Data) -> + {keep_state_and_data, {reply, From, {error, invalid_coordinator}}}; +%% enabled status, status command +handle_event({call, From}, status, ?enabled(#{coordinator_pid := CoordinatorPid}), _Data) -> + {keep_state_and_data, {reply, From, {enabled, CoordinatorPid}}}; +%% fallbacks + +handle_event({call, From}, Msg, State, Data) -> ?SLOG(warning, #{ msg => "unknown_call", call => Msg, - state => St + state => State, + data => Data }), - {reply, ignored, St}. - -handle_info(Msg, St) -> - ?SLOG(warning, #{ - msg => "unknown_info", - info => Msg, - state => St - }), - {noreply, St}. - -handle_cast(Msg, St) -> + {keep_state_and_data, {reply, From, ignored}}; +handle_event(cast, Msg, State, Data) -> ?SLOG(warning, #{ msg => "unknown_cast", cast => Msg, - state => St + state => State, + data => Data }), - {noreply, St}. + keep_state_and_data; +handle_event(info, Msg, State, Data) -> + ?SLOG(warning, #{ + msg => "unknown_info", + info => Msg, + state => State, + data => Data + }), + keep_state_and_data. -code_change(_Vsn, State, _Extra) -> - {ok, State}. +code_change(_Vsn, State, Data, _Extra) -> + {ok, State, Data}. diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl index 44ac0c291..a8f788abc 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl @@ -109,7 +109,8 @@ schema("/load_rebalance/availability_check") -> responses => #{ 200 => response_schema(), 503 => error_codes([?NODE_EVACUATING], <<"Node Evacuating">>) - } + }, + security => [] } }; schema("/load_rebalance/:node/start") -> @@ -248,10 +249,10 @@ schema("/load_rebalance/:node/evacuation/stop") -> }}. '/load_rebalance/availability_check'(get, #{}) -> - case emqx_node_rebalance_status:local_status() of - disabled -> + case emqx_node_rebalance_status:availability_status() of + available -> {200, #{}}; - _ -> + unavailable -> error_response(503, ?NODE_EVACUATING, <<"Node Evacuating">>) end. diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_evacuation.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_evacuation.erl index 6b6aa0675..11c0df3fa 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_evacuation.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_evacuation.erl @@ -57,7 +57,7 @@ migrate_to => migrate_to(), wait_health_check => number() }. --type start_error() :: already_started. +-type start_error() :: already_started | eviction_agent_busy. -type stats() :: #{ initial_conns := non_neg_integer(), initial_sessions := non_neg_integer(), @@ -102,9 +102,9 @@ callback_mode() -> handle_event_function. init([]) -> case emqx_node_rebalance_evacuation_persist:read(default_opts()) of - {ok, #{server_reference := ServerReference} = Opts} -> + {ok, Opts} -> ?SLOG(warning, #{msg => "restoring_evacuation_state", opts => Opts}), - case emqx_eviction_agent:enable(?MODULE, ServerReference) of + case enable_eviction_agent(Opts, _AllowConnections = false) of ok -> Data = init_data(#{}, Opts), ok = warn_enabled(), @@ -122,18 +122,26 @@ handle_event( {call, From}, {start, #{wait_health_check := WaitHealthCheck} = Opts}, disabled, - #{} = Data + Data ) -> - ?SLOG(warning, #{ - msg => "node_evacuation_started", - opts => Opts - }), - NewData = init_data(Data, Opts), - ok = emqx_node_rebalance_evacuation_persist:save(Opts), - {next_state, waiting_health_check, NewData, [ - {state_timeout, seconds(WaitHealthCheck), start_eviction}, - {reply, From, ok} - ]}; + case enable_eviction_agent(Opts, _AllowConnections = true) of + ok -> + ?SLOG(warning, #{ + msg => "node_evacuation_started", + opts => Opts + }), + NewData = init_data(Data, Opts), + ok = emqx_node_rebalance_evacuation_persist:save(Opts), + {next_state, waiting_health_check, NewData, [ + {state_timeout, seconds(WaitHealthCheck), start_eviction}, + {reply, From, ok} + ]}; + {error, eviction_agent_busy} -> + ?tp(warning, eviction_agent_busy, #{ + data => Data + }), + {keep_state_and_data, [{reply, From, {error, eviction_agent_busy}}]} + end; handle_event({call, From}, {start, _Opts}, _State, #{}) -> {keep_state_and_data, [{reply, From, {error, already_started}}]}; %% stop @@ -168,9 +176,9 @@ handle_event( state_timeout, start_eviction, waiting_health_check, - #{server_reference := ServerReference} = Data + Data ) -> - case emqx_eviction_agent:enable(?MODULE, ServerReference) of + case enable_eviction_agent(Data, _AllowConnections = false) of ok -> ?tp(debug, eviction_agent_started, #{ data => Data @@ -178,10 +186,8 @@ handle_event( {next_state, evicting_conns, Data, [ {state_timeout, 0, evict_conns} ]}; + %% This should never happen {error, eviction_agent_busy} -> - ?tp(warning, eviction_agent_busy, #{ - data => Data - }), {next_state, disabled, deinit(Data)} end; %% conn eviction @@ -212,7 +218,7 @@ handle_event( NewData = Data#{current_conns => 0}, ?SLOG(warning, #{msg => "node_evacuation_evict_conns_done"}), {next_state, waiting_takeover, NewData, [ - {state_timeout, timer:seconds(WaitTakeover), evict_sessions} + {state_timeout, seconds(WaitTakeover), evict_sessions} ]} end; handle_event( @@ -308,6 +314,9 @@ deinit(Data) -> maps:keys(default_opts()), maps:without(Keys, Data). +enable_eviction_agent(#{server_reference := ServerReference} = _Opts, AllowConnections) -> + emqx_eviction_agent:enable(?MODULE, ServerReference, #{allow_connections => AllowConnections}). + warn_enabled() -> ?SLOG(warning, #{msg => "node_evacuation_enabled"}), io:format( diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl index 81f1bfe03..17f4bd574 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl @@ -199,7 +199,7 @@ deinit(Data) -> maps:without(Keys, Data). multicall(Nodes, F, A) -> - case apply(emqx_node_rebalance_proto_v2, F, [Nodes | A]) of + case apply(emqx_node_rebalance_proto_v3, F, [Nodes | A]) of {Results, []} -> case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of {_OkResults, []} -> diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_status.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_status.erl index dbeb4d97f..d2cf02ef9 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_status.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_status.erl @@ -5,6 +5,7 @@ -module(emqx_node_rebalance_status). -export([ + availability_status/0, local_status/0, local_status/1, global_status/0, @@ -23,6 +24,13 @@ %% APIs %%-------------------------------------------------------------------- +-spec availability_status() -> available | unavailable. +availability_status() -> + case emqx_eviction_agent:enable_status() of + {enabled, _Kind, _ServerReference, _Options} -> unavailable; + disabled -> available + end. + -spec local_status() -> disabled | {evacuation, map()} | {purge, map()} | {rebalance, map()}. local_status() -> Checks = [ diff --git a/apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_proto_v3.erl b/apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_proto_v3.erl new file mode 100644 index 000000000..ab7943a6f --- /dev/null +++ b/apps/emqx_node_rebalance/src/proto/emqx_node_rebalance_proto_v3.erl @@ -0,0 +1,96 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_node_rebalance_proto_v3). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + available_nodes/1, + evict_connections/2, + evict_sessions/4, + connection_counts/1, + session_counts/1, + enable_rebalance_agent/2, + disable_rebalance_agent/2, + disconnected_session_counts/1, + + %% Introduced in v2: + enable_rebalance_agent/3, + disable_rebalance_agent/3, + purge_sessions/2, + + %% Introduced in v3: + enable_rebalance_agent/4 +]). + +-include_lib("emqx/include/bpapi.hrl"). +-include_lib("emqx/include/types.hrl"). + +introduced_in() -> + "5.4.0". + +-spec available_nodes([node()]) -> emqx_rpc:multicall_result(node()). +available_nodes(Nodes) -> + rpc:multicall(Nodes, emqx_node_rebalance, is_node_available, []). + +-spec evict_connections([node()], non_neg_integer()) -> + emqx_rpc:multicall_result(ok_or_error(disabled)). +evict_connections(Nodes, Count) -> + rpc:multicall(Nodes, emqx_eviction_agent, evict_connections, [Count]). + +-spec evict_sessions([node()], non_neg_integer(), [node()], emqx_channel:conn_state()) -> + emqx_rpc:multicall_result(ok_or_error(disabled)). +evict_sessions(Nodes, Count, RecipientNodes, ConnState) -> + rpc:multicall(Nodes, emqx_eviction_agent, evict_sessions, [Count, RecipientNodes, ConnState]). + +-spec connection_counts([node()]) -> emqx_rpc:multicall_result({ok, non_neg_integer()}). +connection_counts(Nodes) -> + rpc:multicall(Nodes, emqx_node_rebalance, connection_count, []). + +-spec session_counts([node()]) -> emqx_rpc:multicall_result({ok, non_neg_integer()}). +session_counts(Nodes) -> + rpc:multicall(Nodes, emqx_node_rebalance, session_count, []). + +-spec enable_rebalance_agent([node()], pid()) -> + emqx_rpc:multicall_result(ok_or_error(already_enabled | eviction_agent_busy)). +enable_rebalance_agent(Nodes, OwnerPid) -> + rpc:multicall(Nodes, emqx_node_rebalance_agent, enable, [OwnerPid]). + +-spec disable_rebalance_agent([node()], pid()) -> + emqx_rpc:multicall_result(ok_or_error(already_disabled | invalid_coordinator)). +disable_rebalance_agent(Nodes, OwnerPid) -> + rpc:multicall(Nodes, emqx_node_rebalance_agent, disable, [OwnerPid]). + +-spec disconnected_session_counts([node()]) -> emqx_rpc:multicall_result({ok, non_neg_integer()}). +disconnected_session_counts(Nodes) -> + rpc:multicall(Nodes, emqx_node_rebalance, disconnected_session_count, []). + +%% Introduced in v2: + +-spec enable_rebalance_agent([node()], pid(), emqx_eviction_agent:kind()) -> + emqx_rpc:multicall_result(ok_or_error(already_enabled | eviction_agent_busy)). +enable_rebalance_agent(Nodes, OwnerPid, Kind) -> + rpc:multicall(Nodes, emqx_node_rebalance_agent, enable, [OwnerPid, Kind]). + +-spec disable_rebalance_agent([node()], pid(), emqx_eviction_agent:kind()) -> + emqx_rpc:multicall_result(ok_or_error(already_disabled | invalid_coordinator)). +disable_rebalance_agent(Nodes, OwnerPid, Kind) -> + rpc:multicall(Nodes, emqx_node_rebalance_agent, disable, [OwnerPid, Kind]). + +-spec purge_sessions([node()], non_neg_integer()) -> + emqx_rpc:multicall_result(ok_or_error(disabled)). +purge_sessions(Nodes, Count) -> + rpc:multicall(Nodes, emqx_eviction_agent, purge_sessions, [Count]). + +%% Introduced in v3: + +-spec enable_rebalance_agent( + [node()], pid(), emqx_eviction_agent:kind(), emqx_eviction_agent:options() +) -> + emqx_rpc:multicall_result(ok_or_error(eviction_agent_busy | invalid_coordinator)). +enable_rebalance_agent(Nodes, OwnerPid, Kind, Options) -> + rpc:multicall(Nodes, emqx_node_rebalance_agent, enable, [OwnerPid, Kind, Options]). diff --git a/apps/emqx_node_rebalance/test/emqx_node_rebalance_SUITE.erl b/apps/emqx_node_rebalance/test/emqx_node_rebalance_SUITE.erl index a818145a2..d996719fb 100644 --- a/apps/emqx_node_rebalance/test/emqx_node_rebalance_SUITE.erl +++ b/apps/emqx_node_rebalance/test/emqx_node_rebalance_SUITE.erl @@ -16,39 +16,46 @@ -import( emqx_eviction_agent_test_helpers, - [emqtt_connect_many/1, emqtt_connect_many/2, stop_many/1, case_specific_node_name/3] + [ + emqtt_connect_many/1, + emqtt_connect_many/2, + emqtt_try_connect/1, + stop_many/1, + case_specific_node_name/3, + start_cluster/3, + stop_cluster/1 + ] ). --define(START_APPS, [emqx_eviction_agent, emqx_node_rebalance]). - all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - ok = emqx_common_test_helpers:start_apps([]), - Config. + Apps = emqx_cth_suite:start([emqx], #{ + work_dir => ?config(priv_dir, Config) + }), + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - ok = emqx_common_test_helpers:stop_apps([]), - ok. +end_per_suite(Config) -> + emqx_cth_suite:stop(?config(apps, Config)). init_per_testcase(Case, Config) -> - ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster( + NodeNames = [ - {case_specific_node_name(?MODULE, Case, '_donor'), 2883}, - {case_specific_node_name(?MODULE, Case, '_recipient'), 3883} + case_specific_node_name(?MODULE, Case, '_donor'), + case_specific_node_name(?MODULE, Case, '_recipient') ], - ?START_APPS + ClusterNodes = start_cluster( + Config, + NodeNames, + [emqx, emqx_eviction_agent, emqx_node_rebalance] ), ok = snabbkaffe:start_trace(), [{cluster_nodes, ClusterNodes} | Config]. end_per_testcase(_Case, Config) -> ok = snabbkaffe:stop(), - ok = emqx_eviction_agent_test_helpers:stop_cluster( - ?config(cluster_nodes, Config), - ?START_APPS - ). + stop_cluster(?config(cluster_nodes, Config)). %%-------------------------------------------------------------------- %% Tests @@ -227,3 +234,43 @@ t_available_nodes(Config) -> [[DonorNode, RecipientNode]] ) ). + +t_before_health_check_over(Config) -> + process_flag(trap_exit, true), + + [{DonorNode, DonorPort}, {RecipientNode, _RecipientPort}] = ?config(cluster_nodes, Config), + + Nodes = [DonorNode, RecipientNode], + + Conns = emqtt_connect_many(DonorPort, 50), + + Opts = #{ + conn_evict_rate => 1, + sess_evict_rate => 1, + evict_interval => 1000, + abs_conn_threshold => 1, + abs_sess_threshold => 1, + rel_conn_threshold => 1.0, + rel_sess_threshold => 1.0, + wait_health_check => 2, + wait_takeover => 100, + nodes => Nodes + }, + + ?assertWaitEvent( + begin + ok = rpc:call(DonorNode, emqx_node_rebalance, start, [Opts]), + ?assertMatch( + ok, + emqtt_try_connect([{port, DonorPort}]) + ) + end, + #{?snk_kind := node_rebalance_enable_started_prohibiting}, + 5000 + ), + ?assertMatch( + {error, {use_another_server, #{}}}, + emqtt_try_connect([{port, DonorPort}]) + ), + + stop_many(Conns). diff --git a/apps/emqx_node_rebalance/test/emqx_node_rebalance_agent_SUITE.erl b/apps/emqx_node_rebalance/test/emqx_node_rebalance_agent_SUITE.erl index 8b21f9433..9b36fe616 100644 --- a/apps/emqx_node_rebalance/test/emqx_node_rebalance_agent_SUITE.erl +++ b/apps/emqx_node_rebalance/test/emqx_node_rebalance_agent_SUITE.erl @@ -38,12 +38,13 @@ groups() -> ]. init_per_suite(Config) -> - ok = emqx_common_test_helpers:start_apps([emqx_eviction_agent, emqx_node_rebalance]), - Config. + Apps = emqx_cth_suite:start([emqx, emqx_eviction_agent, emqx_node_rebalance], #{ + work_dir => ?config(priv_dir, Config) + }), + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - ok = emqx_common_test_helpers:stop_apps([emqx_eviction_agent, emqx_node_rebalance]), - ok. +end_per_suite(Config) -> + emqx_cth_suite:stop(?config(apps, Config)). init_per_group(local, Config) -> [{cluster, false} | Config]; @@ -56,9 +57,13 @@ end_per_group(_Group, _Config) -> init_per_testcase(Case, Config) -> case ?config(cluster, Config) of true -> - ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster( - [{case_specific_node_name(?MODULE, Case), 2883}], - [emqx_eviction_agent, emqx_node_rebalance] + ClusterNodes = emqx_cth_cluster:start( + [ + {case_specific_node_name(?MODULE, Case), #{ + apps => [emqx, emqx_eviction_agent, emqx_node_rebalance] + }} + ], + #{work_dir => emqx_cth_suite:work_dir(Case, Config)} ), [{cluster_nodes, ClusterNodes} | Config]; false -> @@ -68,10 +73,7 @@ init_per_testcase(Case, Config) -> end_per_testcase(_Case, Config) -> case ?config(cluster, Config) of true -> - emqx_eviction_agent_test_helpers:stop_cluster( - ?config(cluster_nodes, Config), - [emqx_eviction_agent, emqx_node_rebalance] - ); + emqx_cth_cluster:stop(?config(cluster_nodes, Config)); false -> ok end. @@ -94,7 +96,13 @@ t_enable_disable(_Config) -> ), ?assertEqual( - {error, already_enabled}, + {error, invalid_coordinator}, + emqx_node_rebalance_agent:enable(self(), other_rebalance) + ), + + %% Options update + ?assertEqual( + ok, emqx_node_rebalance_agent:enable(self()) ), @@ -150,7 +158,7 @@ t_unknown_messages(_Config) -> t_rebalance_agent_coordinator_fail(Config) -> process_flag(trap_exit, true), - [{Node, _}] = ?config(cluster_nodes, Config), + [Node] = ?config(cluster_nodes, Config), CoordinatorPid = spawn_link( fun() -> @@ -189,7 +197,7 @@ t_rebalance_agent_coordinator_fail(Config) -> t_rebalance_agent_fail(Config) -> process_flag(trap_exit, true), - [{Node, _}] = ?config(cluster_nodes, Config), + [Node] = ?config(cluster_nodes, Config), CoordinatorPid = spawn_link( fun() -> diff --git a/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl b/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl index 017e85971..8b8dc7e42 100644 --- a/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl +++ b/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl @@ -13,6 +13,7 @@ -import( emqx_mgmt_api_test_util, [ + request_api/3, request/2, request/3, uri/1 @@ -24,18 +25,17 @@ [emqtt_connect_many/2, stop_many/1, case_specific_node_name/3] ). --define(START_APPS, [emqx_eviction_agent, emqx_node_rebalance]). - all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - ok = emqx_common_test_helpers:start_apps(?START_APPS), - Config. + Apps = emqx_cth_suite:start([emqx, emqx_eviction_agent, emqx_node_rebalance], #{ + work_dir => ?config(priv_dir, Config) + }), + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - ok = emqx_common_test_helpers:stop_apps(?START_APPS), - ok. +end_per_suite(Config) -> + emqx_cth_suite:stop(?config(apps, Config)). init_per_testcase(Case, Config) -> DonorNode = case_specific_node_name(?MODULE, Case, '_donor'), @@ -57,7 +57,6 @@ init_per_testcase(Case, Config) -> [{cluster_nodes, ClusterNodes} | Config]. end_per_testcase(_Case, Config) -> Nodes = ?config(cluster_nodes, Config), - erpc:multicall(Nodes, meck, unload, []), _ = emqx_cth_cluster:stop(Nodes), ok. @@ -473,28 +472,31 @@ t_start_stop_rebalance(Config) -> t_availability_check(Config) -> [DonorNode | _] = ?config(cluster_nodes, Config), ?assertMatch( - {ok, 200, #{}}, - api_get(["load_rebalance", "availability_check"]) + {ok, _}, + api_get_noauth(["load_rebalance", "availability_check"]) ), ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [#{}]), ?assertMatch( - {ok, 503, _}, - api_get(["load_rebalance", "availability_check"]) + {error, {_, 503, _}}, + api_get_noauth(["load_rebalance", "availability_check"]) ), ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, stop, []), ?assertMatch( - {ok, 200, #{}}, - api_get(["load_rebalance", "availability_check"]) + {ok, _}, + api_get_noauth(["load_rebalance", "availability_check"]) ). %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- +api_get_noauth(Path) -> + request_api(get, uri(Path), emqx_common_test_http:auth_header("invalid", "password")). + api_get(Path) -> case request(get, uri(Path)) of {ok, Code, ResponseBody} -> diff --git a/apps/emqx_node_rebalance/test/emqx_node_rebalance_cli_SUITE.erl b/apps/emqx_node_rebalance/test/emqx_node_rebalance_cli_SUITE.erl index 7d0cab0ce..484a3efe5 100644 --- a/apps/emqx_node_rebalance/test/emqx_node_rebalance_cli_SUITE.erl +++ b/apps/emqx_node_rebalance/test/emqx_node_rebalance_cli_SUITE.erl @@ -15,27 +15,38 @@ [emqtt_connect_many/2, stop_many/1, case_specific_node_name/3] ). --define(START_APPS, [emqx_eviction_agent, emqx_node_rebalance]). +-define(START_APPS, [emqx, emqx_eviction_agent, emqx_node_rebalance]). all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_common_test_helpers:start_apps(?START_APPS), - Config. + Apps = emqx_cth_suite:start(?START_APPS, #{ + work_dir => ?config(priv_dir, Config) + }), + [{apps, Apps} | Config]. end_per_suite(Config) -> - emqx_common_test_helpers:stop_apps(lists:reverse(?START_APPS)), - Config. + emqx_cth_suite:stop(?config(apps, Config)). init_per_testcase(Case = t_rebalance, Config) -> _ = emqx_node_rebalance_evacuation:stop(), - ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster( + Nodes = + [Node1 | _] = [ - {case_specific_node_name(?MODULE, Case, '_donor'), 2883}, - {case_specific_node_name(?MODULE, Case, '_recipient'), 3883} + case_specific_node_name(?MODULE, Case, '_1'), + case_specific_node_name(?MODULE, Case, '_2') ], - ?START_APPS + Spec = #{ + role => core, + join_to => emqx_cth_cluster:node_name(Node1), + listeners => true, + apps => ?START_APPS + }, + Cluster = [{Node, Spec} || Node <- Nodes], + ClusterNodes = emqx_cth_cluster:start( + Cluster, + #{work_dir => emqx_cth_suite:work_dir(Case, Config)} ), [{cluster_nodes, ClusterNodes} | Config]; init_per_testcase(_Case, Config) -> @@ -46,10 +57,7 @@ init_per_testcase(_Case, Config) -> end_per_testcase(t_rebalance, Config) -> _ = emqx_node_rebalance_evacuation:stop(), _ = emqx_node_rebalance:stop(), - _ = emqx_eviction_agent_test_helpers:stop_cluster( - ?config(cluster_nodes, Config), - ?START_APPS - ); + _ = emqx_cth_cluster:stop(?config(cluster_nodes, Config)); end_per_testcase(_Case, _Config) -> _ = emqx_node_rebalance_evacuation:stop(), _ = emqx_node_rebalance:stop(). @@ -157,6 +165,8 @@ t_evacuation(_Config) -> ). t_purge(_Config) -> + process_flag(trap_exit, true), + %% start with invalid args ?assertNot( emqx_node_rebalance_cli:cli(["start", "--purge", "--foo-bar"]) @@ -187,40 +197,44 @@ t_purge(_Config) -> atom_to_list(node()) ]) ), - with_some_sessions(fun() -> - ?assert( - emqx_node_rebalance_cli:cli([ - "start", - "--purge", - "--purge-rate", - "10" - ]) - ), - %% status - ok = emqx_node_rebalance_cli:cli(["status"]), - ok = emqx_node_rebalance_cli:cli(["node-status"]), - ok = emqx_node_rebalance_cli:cli(["node-status", atom_to_list(node())]), + Conns = emqtt_connect_many(get_mqtt_port(node(), tcp), 100), - ?assertMatch( - {enabled, #{}}, - emqx_node_rebalance_purge:status() - ), + ?assert( + emqx_node_rebalance_cli:cli([ + "start", + "--purge", + "--purge-rate", + "10" + ]) + ), + + %% status + ok = emqx_node_rebalance_cli:cli(["status"]), + ok = emqx_node_rebalance_cli:cli(["node-status"]), + ok = emqx_node_rebalance_cli:cli(["node-status", atom_to_list(node())]), + + ?assertMatch( + {enabled, #{}}, + emqx_node_rebalance_purge:status() + ), + + %% already enabled + ?assertNot( + emqx_node_rebalance_cli:cli([ + "start", + "--purge", + "--purge-rate", + "10" + ]) + ), - %% already enabled - ?assertNot( - emqx_node_rebalance_cli:cli([ - "start", - "--purge", - "--purge-rate", - "10" - ]) - ), - true = emqx_node_rebalance_cli:cli(["stop"]), - ok - end), %% stop + true = emqx_node_rebalance_cli:cli(["stop"]), + + %% stop when not started + false = emqx_node_rebalance_cli:cli(["stop"]), ?assertEqual( @@ -228,12 +242,13 @@ t_purge(_Config) -> emqx_node_rebalance_purge:status() ), - ok. + ok = stop_many(Conns). t_rebalance(Config) -> process_flag(trap_exit, true), - [{DonorNode, DonorPort}, {RecipientNode, _}] = ?config(cluster_nodes, Config), + [DonorNode, RecipientNode] = ?config(cluster_nodes, Config), + DonorPort = get_mqtt_port(DonorNode, tcp), %% start with invalid args ?assertNot( @@ -364,11 +379,6 @@ emqx_node_rebalance_cli(Node, Args) -> Result end. -%% to avoid it finishing too fast -with_some_sessions(Fn) -> - emqx_common_test_helpers:with_mock( - emqx_eviction_agent, - all_channels_count, - fun() -> 100 end, - Fn - ). +get_mqtt_port(Node, Type) -> + {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), + Port. diff --git a/apps/emqx_node_rebalance/test/emqx_node_rebalance_evacuation_SUITE.erl b/apps/emqx_node_rebalance/test/emqx_node_rebalance_evacuation_SUITE.erl index b7f1ebb63..945b1566d 100644 --- a/apps/emqx_node_rebalance/test/emqx_node_rebalance_evacuation_SUITE.erl +++ b/apps/emqx_node_rebalance/test/emqx_node_rebalance_evacuation_SUITE.erl @@ -15,7 +15,13 @@ -import( emqx_eviction_agent_test_helpers, - [emqtt_connect/1, emqtt_try_connect/1, case_specific_node_name/3] + [ + emqtt_connect/1, + emqtt_try_connect/1, + case_specific_node_name/3, + start_cluster/3, + stop_cluster/1 + ] ). all() -> [{group, one_node}, {group, two_node}]. @@ -37,12 +43,13 @@ one_node_cases() -> emqx_common_test_helpers:all(?MODULE) -- two_node_cases(). init_per_suite(Config) -> - ok = emqx_common_test_helpers:start_apps([]), - Config. + Apps = emqx_cth_suite:start([emqx], #{ + work_dir => ?config(priv_dir, Config) + }), + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - ok = emqx_common_test_helpers:stop_apps([]), - ok. +end_per_suite(Config) -> + emqx_cth_suite:stop(?config(apps, Config)). init_per_group(one_node, Config) -> [{cluster_type, one_node} | Config]; @@ -53,30 +60,23 @@ end_per_group(_Group, _Config) -> ok. init_per_testcase(Case, Config) -> - NodesWithPorts = + NodeNames = case ?config(cluster_type, Config) of one_node -> - [{case_specific_node_name(?MODULE, Case, '_evacuated'), 2883}]; + [case_specific_node_name(?MODULE, Case, '_evacuated')]; two_node -> [ - {case_specific_node_name(?MODULE, Case, '_evacuated'), 2883}, - {case_specific_node_name(?MODULE, Case, '_recipient'), 3883} + case_specific_node_name(?MODULE, Case, '_evacuated'), + case_specific_node_name(?MODULE, Case, '_recipient') ] end, - ClusterNodes = emqx_eviction_agent_test_helpers:start_cluster( - NodesWithPorts, - [emqx_eviction_agent, emqx_node_rebalance], - [{emqx, data_dir, case_specific_data_dir(Case, Config)}] - ), + ClusterNodes = start_cluster(Config, NodeNames, [emqx, emqx_eviction_agent, emqx_node_rebalance]), ok = snabbkaffe:start_trace(), [{cluster_nodes, ClusterNodes} | Config]. end_per_testcase(_Case, Config) -> ok = snabbkaffe:stop(), - ok = emqx_eviction_agent_test_helpers:stop_cluster( - ?config(cluster_nodes, Config), - [emqx_eviction_agent, emqx_node_rebalance] - ). + stop_cluster(?config(cluster_nodes, Config)). %%-------------------------------------------------------------------- %% Tests @@ -89,10 +89,9 @@ t_agent_busy(Config) -> ok = rpc:call(DonorNode, emqx_eviction_agent, enable, [other_rebalance, undefined]), - ?assertWaitEvent( - rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]), - #{?snk_kind := eviction_agent_busy}, - 5000 + ?assertEqual( + {error, eviction_agent_busy}, + rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]) ). t_already_started(Config) -> @@ -118,7 +117,13 @@ t_start(Config) -> [{DonorNode, DonorPort}] = ?config(cluster_nodes, Config), ?assertWaitEvent( - rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]), + begin + rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]), + ?assertMatch( + ok, + emqtt_try_connect([{port, DonorPort}]) + ) + end, #{?snk_kind := eviction_agent_started}, 5000 ), diff --git a/apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl b/apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl index 7cdcc4d71..f74da13f6 100644 --- a/apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl +++ b/apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl @@ -18,7 +18,9 @@ [ emqtt_connect/1, emqtt_try_connect/1, - case_specific_node_name/3 + case_specific_node_name/3, + stop_many/1, + get_mqtt_port/2 ] ). @@ -41,11 +43,13 @@ one_node_cases() -> emqx_common_test_helpers:all(?MODULE) -- two_nodes_cases(). init_per_suite(Config) -> - ok = emqx_common_test_helpers:start_apps([]), - Config. + Apps = emqx_cth_suite:start([emqx], #{ + work_dir => ?config(priv_dir, Config) + }), + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - ok = emqx_common_test_helpers:stop_apps([]), +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)), ok. init_per_group(one_node, Config) -> @@ -78,7 +82,7 @@ init_per_testcase(TestCase, Config) -> Cluster = [{Node, Spec} || Node <- Nodes], ClusterNodes = emqx_cth_cluster:start( Cluster, - #{work_dir => ?config(priv_dir, Config)} + #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} ), ok = snabbkaffe:start_trace(), [{cluster_nodes, ClusterNodes} | Config]. @@ -128,20 +132,12 @@ case_specific_data_dir(Case, Config) -> PrivDir -> filename:join(PrivDir, atom_to_list(Case)) end. -get_mqtt_port(Node, Type) -> - {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), - Port. - %% to avoid it finishing too fast with_some_sessions(Node, Fn) -> - erpc:call(Node, fun() -> - emqx_common_test_helpers:with_mock( - emqx_eviction_agent, - all_channels_count, - fun() -> 100 end, - Fn - ) - end). + Port = get_mqtt_port(Node, tcp), + Conns = emqtt_connect_many(Port, 100), + _ = erpc:call(Node, Fn), + ok = stop_many(Conns). drain_exits([ClientPid | Rest]) -> receive @@ -189,6 +185,7 @@ t_agent_busy(Config) -> ok. t_already_started(Config) -> + process_flag(trap_exit, true), [Node] = ?config(cluster_nodes, Config), with_some_sessions(Node, fun() -> ok = emqx_node_rebalance_purge:start(opts(Config)), @@ -216,6 +213,7 @@ t_not_started(Config) -> ). t_start(Config) -> + process_flag(trap_exit, true), [Node] = ?config(cluster_nodes, Config), Port = get_mqtt_port(Node, tcp), @@ -233,6 +231,7 @@ t_start(Config) -> ok. t_non_persistence(Config) -> + process_flag(trap_exit, true), [Node] = ?config(cluster_nodes, Config), Port = get_mqtt_port(Node, tcp), @@ -284,6 +283,7 @@ t_unknown_messages(Config) -> %%-------------------------------------------------------------------- t_already_started_two(Config) -> + process_flag(trap_exit, true), [Node1, _Node2] = ?config(cluster_nodes, Config), with_some_sessions(Node1, fun() -> ok = emqx_node_rebalance_purge:start(opts(Config)), diff --git a/apps/emqx_node_rebalance/test/emqx_node_rebalance_status_SUITE.erl b/apps/emqx_node_rebalance/test/emqx_node_rebalance_status_SUITE.erl index f9c50b761..9351e065e 100644 --- a/apps/emqx_node_rebalance/test/emqx_node_rebalance_status_SUITE.erl +++ b/apps/emqx_node_rebalance/test/emqx_node_rebalance_status_SUITE.erl @@ -32,6 +32,7 @@ init_per_suite(Config) -> Apps = [ emqx_conf, emqx, + emqx_eviction_agent, emqx_node_rebalance ], Cluster = [ diff --git a/changes/ee/feat-11971.en.md b/changes/ee/feat-11971.en.md new file mode 100644 index 000000000..edf99cae2 --- /dev/null +++ b/changes/ee/feat-11971.en.md @@ -0,0 +1,4 @@ +Made `/api/v5/load_rebalance/availability_check` public, i.e. not requiring authentication. This simplifies load balancer setup. + +Made rebalance/evacuation more graceful during the wait health check phase. The connections to nodes marked for eviction are now not prohibited during this phase. +During this phase it is unknown whether these nodes are all marked unhealthy by the load balancer, so prohibiting connections to them may cause multiple unssuccessful attempts to reconnect.