From d1a1e8041d5268440f5a1cafaa678eaf5fd6e745 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 7 Jun 2023 13:57:16 +0300 Subject: [PATCH 1/3] fix(rebalance api): fix error message formatting --- .../src/emqx_node_rebalance_api.erl | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl index d0526f5d5..713e16463 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl @@ -258,11 +258,11 @@ wrap_rpc(Node, RPCResult) -> {200, #{}}; {error, Reason} -> error_response( - 400, ?BAD_REQUEST, io_lib:format("error on node ~p: ~p", [Node, Reason]) + 400, ?BAD_REQUEST, binfmt("error on node ~p: ~p", [Node, Reason]) ); {badrpc, Reason} -> error_response( - 503, ?RPC_ERROR, io_lib:format("RPC error on node ~p: ~p", [Node, Reason]) + 503, ?RPC_ERROR, binfmt("RPC error on node ~p: ~p", [Node, Reason]) ) end. @@ -299,9 +299,9 @@ with_nodes_at_key(Key, Params, Fun) -> {ok, Params1} -> Fun(Params1); {error, {unavailable, Nodes}} -> - error_response(400, ?NOT_FOUND, io_lib:format("Nodes unavailable: ~p", [Nodes])); + error_response(400, ?NOT_FOUND, binfmt("Nodes unavailable: ~p", [Nodes])); {error, {invalid, Nodes}} -> - error_response(400, ?BAD_REQUEST, io_lib:format("Invalid nodes: ~p", [Nodes])) + error_response(400, ?BAD_REQUEST, binfmt("Invalid nodes: ~p", [Nodes])) end. parse_node(Bin) when is_binary(Bin) -> @@ -331,6 +331,8 @@ without(Keys, Props) -> Props ). +binfmt(Fmt, Args) -> iolist_to_binary(io_lib:format(Fmt, Args)). + %%------------------------------------------------------------------------------ %% Schema %%------------------------------------------------------------------------------ From 7f2de66dab7d2d5b2c2ede30f95385273677fbbe Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 7 Jun 2023 18:35:41 +0300 Subject: [PATCH 2/3] fix(rebalance): add wait_health_check timeout to node evacuation Co-authored-by: Thales Macedo Garitezi --- .../src/emqx_node_rebalance.erl | 6 +- .../src/emqx_node_rebalance_api.erl | 13 +++- .../src/emqx_node_rebalance_cli.erl | 7 +- .../src/emqx_node_rebalance_evacuation.erl | 66 ++++++++++++------- ...emqx_node_rebalance_evacuation_persist.erl | 18 ++--- .../test/emqx_node_rebalance_api_SUITE.erl | 2 + .../emqx_node_rebalance_evacuation_SUITE.erl | 24 +++++-- rel/i18n/emqx_node_rebalance_api.hocon | 2 +- 8 files changed, 89 insertions(+), 49 deletions(-) diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance.erl index 70c022308..9d53841ed 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance.erl @@ -48,8 +48,8 @@ -type start_opts() :: #{ conn_evict_rate => pos_integer(), sess_evict_rate => pos_integer(), - wait_health_check => pos_integer(), - wait_takeover => pos_integer(), + wait_health_check => number(), + wait_takeover => number(), abs_conn_threshold => pos_integer(), rel_conn_threshold => number(), abs_sess_threshold => pos_integer(), @@ -438,7 +438,7 @@ is_node_available() -> node(). all_nodes() -> - mria_mnesia:running_nodes(). + emqx:running_nodes(). seconds(Sec) -> round(timer:seconds(Sec)). diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl index 713e16463..abae139ad 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl @@ -202,10 +202,10 @@ schema("/load_rebalance/:node/evacuation/stop") -> }}. '/load_rebalance/availability_check'(get, #{}) -> - case emqx_eviction_agent:status() of + case emqx_node_rebalance_status:local_status() of disabled -> {200, #{}}; - {enabled, _Stats} -> + _ -> error_response(503, ?NODE_EVACUATING, <<"Node Evacuating">>) end. @@ -434,6 +434,14 @@ fields(rebalance_start) -> ]; fields(rebalance_evacuation_start) -> [ + {"wait_health_check", + mk( + emqx_schema:timeout_duration_s(), + #{ + desc => ?DESC(wait_health_check), + required => false + } + )}, {"conn_evict_rate", mk( pos_integer(), @@ -714,6 +722,7 @@ rebalance_example() -> rebalance_evacuation_example() -> #{ + wait_health_check => 10, conn_evict_rate => 100, sess_evict_rate => 100, redirect_to => <<"othernode:1883">>, diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_cli.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_cli.erl index 3bafb9ffe..66f7a1789 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_cli.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_cli.erl @@ -103,6 +103,7 @@ cli(_) -> [ { "rebalance start --evacuation \\\n" + " [--wait-health-check Secs] \\\n" " [--redirect-to \"Host1:Port1 Host2:Port2 ...\"] \\\n" " [--conn-evict-rate CountPerSec] \\\n" " [--migrate-to \"node1@host1 node2@host2 ...\"] \\\n" @@ -182,8 +183,6 @@ collect_args(["--migrate-to", MigrateTo | Args], Map) -> %% rebalance collect_args(["--nodes", Nodes | Args], Map) -> collect_args(Args, Map#{"--nodes" => Nodes}); -collect_args(["--wait-health-check", WaitHealthCheck | Args], Map) -> - collect_args(Args, Map#{"--wait-health-check" => WaitHealthCheck}); collect_args(["--abs-conn-threshold", AbsConnThres | Args], Map) -> collect_args(Args, Map#{"--abs-conn-threshold" => AbsConnThres}); collect_args(["--rel-conn-threshold", RelConnThres | Args], Map) -> @@ -193,6 +192,8 @@ collect_args(["--abs-sess-threshold", AbsSessThres | Args], Map) -> collect_args(["--rel-sess-threshold", RelSessThres | Args], Map) -> collect_args(Args, Map#{"--rel-sess-threshold" => RelSessThres}); %% common +collect_args(["--wait-health-check", WaitHealthCheck | Args], Map) -> + collect_args(Args, Map#{"--wait-health-check" => WaitHealthCheck}); collect_args(["--conn-evict-rate", ConnEvictRate | Args], Map) -> collect_args(Args, Map#{"--conn-evict-rate" => ConnEvictRate}); collect_args(["--wait-takeover", WaitTakeover | Args], Map) -> @@ -207,6 +208,8 @@ validate_evacuation([], Map) -> {ok, Map}; validate_evacuation([{"--evacuation", _} | Rest], Map) -> validate_evacuation(Rest, Map); +validate_evacuation([{"--wait-health-check", _} | _] = Opts, Map) -> + validate_pos_int(wait_health_check, Opts, Map, fun validate_evacuation/2); validate_evacuation([{"--redirect-to", ServerReference} | Rest], Map) -> validate_evacuation(Rest, Map#{server_reference => list_to_binary(ServerReference)}); validate_evacuation([{"--conn-evict-rate", _} | _] = Opts, Map) -> diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_evacuation.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_evacuation.erl index 4de362ca9..6b6aa0675 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_evacuation.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_evacuation.erl @@ -53,10 +53,11 @@ server_reference => emqx_eviction_agent:server_reference(), conn_evict_rate => pos_integer(), sess_evict_rate => pos_integer(), - wait_takeover => pos_integer(), - migrate_to => migrate_to() + wait_takeover => number(), + migrate_to => migrate_to(), + wait_health_check => number() }. --type start_error() :: already_started | eviction_agent_busy. +-type start_error() :: already_started. -type stats() :: #{ initial_conns := non_neg_integer(), initial_sessions := non_neg_integer(), @@ -97,7 +98,7 @@ available_nodes(Nodes) when is_list(Nodes) -> callback_mode() -> handle_event_function. -%% states: disabled, evicting_conns, waiting_takeover, evicting_sessions, prohibiting +%% states: disabled, waiting_health_check, evicting_conns, waiting_takeover, evicting_sessions, prohibiting init([]) -> case emqx_node_rebalance_evacuation_persist:read(default_opts()) of @@ -119,25 +120,20 @@ init([]) -> %% start handle_event( {call, From}, - {start, #{server_reference := ServerReference} = Opts}, + {start, #{wait_health_check := WaitHealthCheck} = Opts}, disabled, #{} = Data ) -> - case emqx_eviction_agent:enable(?MODULE, ServerReference) of - ok -> - NewData = init_data(Data, Opts), - ok = emqx_node_rebalance_evacuation_persist:save(Opts), - ?SLOG(warning, #{ - msg => "node_evacuation_started", - opts => Opts - }), - {next_state, evicting_conns, NewData, [ - {state_timeout, 0, evict_conns}, - {reply, From, ok} - ]}; - {error, eviction_agent_busy} -> - {keep_state_and_data, [{reply, From, {error, eviction_agent_busy}}]} - end; + ?SLOG(warning, #{ + msg => "node_evacuation_started", + opts => Opts + }), + NewData = init_data(Data, Opts), + ok = emqx_node_rebalance_evacuation_persist:save(Opts), + {next_state, waiting_health_check, NewData, [ + {state_timeout, seconds(WaitHealthCheck), start_eviction}, + {reply, From, ok} + ]}; handle_event({call, From}, {start, _Opts}, _State, #{}) -> {keep_state_and_data, [{reply, From, {error, already_started}}]}; %% stop @@ -167,6 +163,27 @@ handle_event({call, From}, status, State, #{migrate_to := MigrateTo} = Data) -> {keep_state_and_data, [ {reply, From, {enabled, Stats#{state => State, migrate_to => migrate_to(MigrateTo)}}} ]}; +%% start eviction +handle_event( + state_timeout, + start_eviction, + waiting_health_check, + #{server_reference := ServerReference} = Data +) -> + case emqx_eviction_agent:enable(?MODULE, ServerReference) of + ok -> + ?tp(debug, eviction_agent_started, #{ + data => Data + }), + {next_state, evicting_conns, Data, [ + {state_timeout, 0, evict_conns} + ]}; + {error, eviction_agent_busy} -> + ?tp(warning, eviction_agent_busy, #{ + data => Data + }), + {next_state, disabled, deinit(Data)} + end; %% conn eviction handle_event( state_timeout, @@ -270,12 +287,14 @@ default_opts() -> conn_evict_rate => ?DEFAULT_CONN_EVICT_RATE, sess_evict_rate => ?DEFAULT_SESS_EVICT_RATE, wait_takeover => ?DEFAULT_WAIT_TAKEOVER, + wait_health_check => ?DEFAULT_WAIT_HEALTH_CHECK, migrate_to => undefined }. init_data(Data0, Opts) -> Data1 = maps:merge(Data0, Opts), - {enabled, #{connections := ConnCount, sessions := SessCount}} = emqx_eviction_agent:status(), + ConnCount = emqx_eviction_agent:connection_count(), + SessCount = emqx_eviction_agent:session_count(), Data1#{ initial_conns => ConnCount, current_conns => ConnCount, @@ -305,4 +324,7 @@ is_node_available() -> node(). all_nodes() -> - mria_mnesia:running_nodes() -- [node()]. + emqx:running_nodes() -- [node()]. + +seconds(Sec) -> + round(timer:seconds(Sec)). diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_evacuation_persist.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_evacuation_persist.erl index 6b145c699..f32bc6ddd 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_evacuation_persist.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_evacuation_persist.erl @@ -21,24 +21,16 @@ %% APIs %%-------------------------------------------------------------------- -%% do not persist `migrate_to`: -%% * after restart there is nothing to migrate -%% * this value may be invalid after node was offline --type persisted_start_opts() :: #{ - server_reference => emqx_eviction_agent:server_reference(), - conn_evict_rate => pos_integer(), - sess_evict_rate => pos_integer(), - wait_takeover => pos_integer() -}. -type start_opts() :: #{ server_reference => emqx_eviction_agent:server_reference(), conn_evict_rate => pos_integer(), sess_evict_rate => pos_integer(), - wait_takeover => pos_integer(), - migrate_to => emqx_node_rebalance_evacuation:migrate_to() + wait_takeover => number(), + migrate_to => emqx_node_rebalance_evacuation:migrate_to(), + wait_health_check => number() }. --spec save(persisted_start_opts()) -> ok_or_error(term()). +-spec save(start_opts()) -> ok_or_error(term()). save( #{ server_reference := ServerReference, @@ -50,7 +42,7 @@ save( (is_binary(ServerReference) orelse ServerReference =:= undefined) andalso is_integer(ConnEvictRate) andalso ConnEvictRate > 0 andalso is_integer(SessEvictRate) andalso SessEvictRate > 0 andalso - is_integer(WaitTakeover) andalso WaitTakeover >= 0 + is_number(WaitTakeover) andalso WaitTakeover >= 0 -> Filepath = evacuation_filepath(), case filelib:ensure_dir(Filepath) of diff --git a/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl b/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl index d8202a33e..119b4a5d9 100644 --- a/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl +++ b/apps/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl @@ -69,6 +69,7 @@ t_start_evacuation_validation(Config) -> #{sess_evict_rate => <<"sess">>}, #{redirect_to => 123}, #{wait_takeover => <<"wait">>}, + #{wait_health_check => <<"wait">>}, #{migrate_to => []}, #{migrate_to => <<"migrate_to">>}, #{migrate_to => [<<"bad_node">>]}, @@ -103,6 +104,7 @@ t_start_evacuation_validation(Config) -> conn_evict_rate => 10, sess_evict_rate => 10, wait_takeover => 10, + wait_health_check => 10, redirect_to => <<"srv">>, migrate_to => [atom_to_binary(RecipientNode)] } diff --git a/apps/emqx_node_rebalance/test/emqx_node_rebalance_evacuation_SUITE.erl b/apps/emqx_node_rebalance/test/emqx_node_rebalance_evacuation_SUITE.erl index 5d774ba7c..b7f1ebb63 100644 --- a/apps/emqx_node_rebalance/test/emqx_node_rebalance_evacuation_SUITE.erl +++ b/apps/emqx_node_rebalance/test/emqx_node_rebalance_evacuation_SUITE.erl @@ -86,11 +86,13 @@ end_per_testcase(_Case, Config) -> t_agent_busy(Config) -> [{DonorNode, _DonorPort}] = ?config(cluster_nodes, Config), + ok = rpc:call(DonorNode, emqx_eviction_agent, enable, [other_rebalance, undefined]), - ?assertEqual( - {error, eviction_agent_busy}, - rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]) + ?assertWaitEvent( + rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]), + #{?snk_kind := eviction_agent_busy}, + 5000 ). t_already_started(Config) -> @@ -115,7 +117,12 @@ t_start(Config) -> [{DonorNode, DonorPort}] = ?config(cluster_nodes, Config), - ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]), + ?assertWaitEvent( + rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]), + #{?snk_kind := eviction_agent_started}, + 5000 + ), + ?assertMatch( {error, {use_another_server, #{}}}, emqtt_try_connect([{port, DonorPort}]) @@ -126,7 +133,11 @@ t_persistence(Config) -> [{DonorNode, DonorPort}] = ?config(cluster_nodes, Config), - ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]), + ?assertWaitEvent( + rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]), + #{?snk_kind := eviction_agent_started}, + 5000 + ), ?assertMatch( {error, {use_another_server, #{}}}, @@ -179,7 +190,7 @@ t_conn_evicted(Config) -> ?assertWaitEvent( ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]), #{?snk_kind := node_evacuation_evict_conn}, - 1000 + 5000 ), ?assertMatch( @@ -251,6 +262,7 @@ opts(Config) -> conn_evict_rate => 10, sess_evict_rate => 10, wait_takeover => 1, + wait_health_check => 1, migrate_to => migrate_to(Config) }. diff --git a/rel/i18n/emqx_node_rebalance_api.hocon b/rel/i18n/emqx_node_rebalance_api.hocon index bb67f2aad..8b598134a 100644 --- a/rel/i18n/emqx_node_rebalance_api.hocon +++ b/rel/i18n/emqx_node_rebalance_api.hocon @@ -49,7 +49,7 @@ param_node.label: """Node name""" wait_health_check.desc: -"""Time to wait before starting the rebalance process, in seconds""" +"""Time to wait before starting the rebalance/evacuation process, in seconds""" wait_health_check.label: """Wait health check""" From 54f7941329885adfa172fa66b389778e251aa0a2 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 7 Jun 2023 18:54:54 +0300 Subject: [PATCH 3/3] fix(rebalancing): add changelog Co-authored-by: Thales Macedo Garitezi --- changes/ee/fix-10967.en.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changes/ee/fix-10967.en.md diff --git a/changes/ee/fix-10967.en.md b/changes/ee/fix-10967.en.md new file mode 100644 index 000000000..55cc2b975 --- /dev/null +++ b/changes/ee/fix-10967.en.md @@ -0,0 +1,3 @@ +Fixed error message formatting in rebalance API: previously they could be displayed as unclear dumps of internal Erlang structures. + +Added `wait_health_check` option to node evacuation CLI and API. This is a time interval when the node reports "unhealthy status" without beginning actual evacuation. We need this to allow a Load Balancer (if any) to remove the evacuated node from balancing and not forward (re)connecting clients to the evacuated node.