Merge pull request #12802 from SergeTupchiy/EMQX-11826-prevent-left-node-from-rejoining-5.6.1
prevent a left node from rejoining the same cluster
This commit is contained in:
commit
2e528d1dd8
|
@ -28,7 +28,7 @@
|
||||||
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
|
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
|
||||||
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
|
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
|
||||||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}},
|
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}},
|
||||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.1"}}},
|
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.2"}}},
|
||||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
|
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
|
||||||
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.42.1"}}},
|
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.42.1"}}},
|
||||||
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
|
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
|
||||||
|
|
|
@ -108,6 +108,7 @@ cluster(["join", SNode]) ->
|
||||||
emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error])
|
emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error])
|
||||||
end;
|
end;
|
||||||
cluster(["leave"]) ->
|
cluster(["leave"]) ->
|
||||||
|
_ = maybe_disable_autocluster(),
|
||||||
case mria:leave() of
|
case mria:leave() of
|
||||||
ok ->
|
ok ->
|
||||||
emqx_ctl:print("Leave the cluster successfully.~n"),
|
emqx_ctl:print("Leave the cluster successfully.~n"),
|
||||||
|
@ -139,12 +140,15 @@ cluster(["status"]) ->
|
||||||
cluster(["status", "--json"]) ->
|
cluster(["status", "--json"]) ->
|
||||||
Info = sort_map_list_fields(cluster_info()),
|
Info = sort_map_list_fields(cluster_info()),
|
||||||
emqx_ctl:print("~ts~n", [emqx_logger_jsonfmt:best_effort_json(Info)]);
|
emqx_ctl:print("~ts~n", [emqx_logger_jsonfmt:best_effort_json(Info)]);
|
||||||
|
cluster(["discovery", "enable"]) ->
|
||||||
|
enable_autocluster();
|
||||||
cluster(_) ->
|
cluster(_) ->
|
||||||
emqx_ctl:usage([
|
emqx_ctl:usage([
|
||||||
{"cluster join <Node>", "Join the cluster"},
|
{"cluster join <Node>", "Join the cluster"},
|
||||||
{"cluster leave", "Leave the cluster"},
|
{"cluster leave", "Leave the cluster"},
|
||||||
{"cluster force-leave <Node>", "Force the node leave from cluster"},
|
{"cluster force-leave <Node>", "Force the node leave from cluster"},
|
||||||
{"cluster status [--json]", "Cluster status"}
|
{"cluster status [--json]", "Cluster status"},
|
||||||
|
{"cluster discovery enable", "Enable and run automatic cluster discovery (if configured)"}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% sort lists for deterministic output
|
%% sort lists for deterministic output
|
||||||
|
@ -163,6 +167,25 @@ sort_map_list_field(Field, Map) ->
|
||||||
_ -> Map
|
_ -> Map
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
enable_autocluster() ->
|
||||||
|
ok = ekka:enable_autocluster(),
|
||||||
|
_ = ekka:autocluster(emqx),
|
||||||
|
emqx_ctl:print("Automatic cluster discovery enabled.~n").
|
||||||
|
|
||||||
|
maybe_disable_autocluster() ->
|
||||||
|
case ekka:autocluster_enabled() of
|
||||||
|
true ->
|
||||||
|
ok = ekka:disable_autocluster(),
|
||||||
|
emqx_ctl:print(
|
||||||
|
"Automatic cluster discovery is disabled on this node: ~p to avoid"
|
||||||
|
" re-joining the same cluster again, if the node is not stopped soon."
|
||||||
|
" To enable it run: 'emqx ctl cluster discovery enable' or restart the node.~n",
|
||||||
|
[node()]
|
||||||
|
);
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% @doc Query clients
|
%% @doc Query clients
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
@ -31,6 +32,47 @@ init_per_suite(Config) ->
|
||||||
end_per_suite(_) ->
|
end_per_suite(_) ->
|
||||||
emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]).
|
emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]).
|
||||||
|
|
||||||
|
init_per_testcase(t_autocluster_leave = TC, Config) ->
|
||||||
|
[Core1, Core2, Core3, Repl] =
|
||||||
|
Nodes = [
|
||||||
|
t_autocluster_leave_core1,
|
||||||
|
t_autocluster_leave_core2,
|
||||||
|
t_autocluster_leave_core3,
|
||||||
|
t_autocluster_leave_replicant
|
||||||
|
],
|
||||||
|
|
||||||
|
NodeNames = [emqx_cth_cluster:node_name(N) || N <- Nodes],
|
||||||
|
AppSpec = [
|
||||||
|
emqx,
|
||||||
|
{emqx_conf, #{
|
||||||
|
config => #{
|
||||||
|
cluster => #{
|
||||||
|
discovery_strategy => static,
|
||||||
|
static => #{seeds => NodeNames}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}},
|
||||||
|
emqx_management
|
||||||
|
],
|
||||||
|
Cluster = emqx_cth_cluster:start(
|
||||||
|
[
|
||||||
|
{Core1, #{role => core, apps => AppSpec}},
|
||||||
|
{Core2, #{role => core, apps => AppSpec}},
|
||||||
|
{Core3, #{role => core, apps => AppSpec}},
|
||||||
|
{Repl, #{role => replicant, apps => AppSpec}}
|
||||||
|
],
|
||||||
|
#{work_dir => emqx_cth_suite:work_dir(TC, Config)}
|
||||||
|
),
|
||||||
|
[{cluster, Cluster} | Config];
|
||||||
|
init_per_testcase(_TC, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(_TC, Config) ->
|
||||||
|
case ?config(cluster, Config) of
|
||||||
|
undefined -> ok;
|
||||||
|
Cluster -> emqx_cth_cluster:stop(Cluster)
|
||||||
|
end.
|
||||||
|
|
||||||
t_status(_Config) ->
|
t_status(_Config) ->
|
||||||
emqx_ctl:run_command([]),
|
emqx_ctl:run_command([]),
|
||||||
emqx_ctl:run_command(["status"]),
|
emqx_ctl:run_command(["status"]),
|
||||||
|
@ -263,3 +305,44 @@ t_admin(_Config) ->
|
||||||
%% admins passwd <Username> <Password> # Reset dashboard user password
|
%% admins passwd <Username> <Password> # Reset dashboard user password
|
||||||
%% admins del <Username> # Delete dashboard user
|
%% admins del <Username> # Delete dashboard user
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_autocluster_leave(Config) ->
|
||||||
|
[Core1, Core2, Core3, Repl] = Cluster = ?config(cluster, Config),
|
||||||
|
%% Mria membership updates are async, makes sense to wait a little
|
||||||
|
timer:sleep(300),
|
||||||
|
ClusterView = [lists:sort(rpc:call(N, emqx, running_nodes, [])) || N <- Cluster],
|
||||||
|
[View1, View2, View3, View4] = ClusterView,
|
||||||
|
?assertEqual(lists:sort(Cluster), View1),
|
||||||
|
?assertEqual(View1, View2),
|
||||||
|
?assertEqual(View1, View3),
|
||||||
|
?assertEqual(View1, View4),
|
||||||
|
|
||||||
|
rpc:call(Core3, emqx_mgmt_cli, cluster, [["leave"]]),
|
||||||
|
timer:sleep(1000),
|
||||||
|
%% Replicant node may still discover and join Core3 which is now split from [Core1, Core2],
|
||||||
|
%% but it's expected to choose a bigger cluster of [Core1, Core2]..
|
||||||
|
?assertMatch([Core3], rpc:call(Core3, emqx, running_nodes, [])),
|
||||||
|
?assertEqual(undefined, rpc:call(Core1, erlang, whereis, [ekka_autocluster])),
|
||||||
|
?assertEqual(lists:sort([Core1, Core2, Repl]), rpc:call(Core1, emqx, running_nodes, [])),
|
||||||
|
?assertEqual(lists:sort([Core1, Core2, Repl]), rpc:call(Core2, emqx, running_nodes, [])),
|
||||||
|
?assertEqual(lists:sort([Core1, Core2, Repl]), rpc:call(Repl, emqx, running_nodes, [])),
|
||||||
|
|
||||||
|
rpc:call(Repl, emqx_mgmt_cli, cluster, [["leave"]]),
|
||||||
|
timer:sleep(1000),
|
||||||
|
?assertEqual(lists:sort([Core1, Core2]), rpc:call(Core1, emqx, running_nodes, [])),
|
||||||
|
?assertEqual(lists:sort([Core1, Core2]), rpc:call(Core2, emqx, running_nodes, [])),
|
||||||
|
|
||||||
|
rpc:call(Core3, emqx_mgmt_cli, cluster, [["discovery", "enable"]]),
|
||||||
|
rpc:call(Repl, emqx_mgmt_cli, cluster, [["discovery", "enable"]]),
|
||||||
|
%% core nodes will join and restart asyncly, may need more time to re-cluster
|
||||||
|
?assertEqual(
|
||||||
|
ok,
|
||||||
|
emqx_common_test_helpers:wait_for(
|
||||||
|
?FUNCTION_NAME,
|
||||||
|
?LINE,
|
||||||
|
fun() ->
|
||||||
|
[lists:sort(rpc:call(N, emqx, running_nodes, [])) || N <- Cluster] =:= ClusterView
|
||||||
|
end,
|
||||||
|
10_000
|
||||||
|
)
|
||||||
|
).
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
Improve cluster discovery behaviour when a node is manually removed from a cluster using 'emqx ctl cluster leave' command.
|
||||||
|
Previously, if the configured cluster 'discovery_strategy' was not 'manual', the left node might re-discover and re-join the same cluster shortly after it left (unless it was stopped).
|
||||||
|
After this change, 'cluster leave' command disables automatic cluster_discovery, so that the left node won't re-join the same cluster again. Cluster discovery can be re-enabled by running 'emqx ctl discovery enable` or by restarting the left node.
|
2
mix.exs
2
mix.exs
|
@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
{:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
|
{:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
|
||||||
{:esockd, github: "emqx/esockd", tag: "5.11.1", override: true},
|
{:esockd, github: "emqx/esockd", tag: "5.11.1", override: true},
|
||||||
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-2", override: true},
|
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-2", override: true},
|
||||||
{:ekka, github: "emqx/ekka", tag: "0.19.1", override: true},
|
{:ekka, github: "emqx/ekka", tag: "0.19.2", override: true},
|
||||||
{:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true},
|
{:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true},
|
||||||
{:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true},
|
{:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true},
|
||||||
{:minirest, github: "emqx/minirest", tag: "1.4.0", override: true},
|
{:minirest, github: "emqx/minirest", tag: "1.4.0", override: true},
|
||||||
|
|
|
@ -83,7 +83,7 @@
|
||||||
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
|
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
|
||||||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}},
|
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}},
|
||||||
{rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-2"}}},
|
{rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-2"}}},
|
||||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.1"}}},
|
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.2"}}},
|
||||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
|
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
|
||||||
{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}},
|
{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}},
|
||||||
{minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.0"}}},
|
{minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.0"}}},
|
||||||
|
|
Loading…
Reference in New Issue