diff --git a/apps/emqx_management/src/emqx_mgmt_api_cluster.erl b/apps/emqx_management/src/emqx_mgmt_api_cluster.erl index be1b8e354..0a9a17ea2 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_cluster.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_cluster.erl @@ -31,6 +31,8 @@ connected_replicants/0 ]). +-define(DEFAULT_INVITE_TIMEOUT, 15000). + namespace() -> "cluster". api_spec() -> @@ -77,6 +79,7 @@ schema("/cluster/:node/invite") -> desc => ?DESC(invite_node), tags => [<<"Cluster">>], parameters => [hoconsc:ref(node)], + 'requestBody' => hoconsc:ref(timeout), responses => #{ 200 => <<"ok">>, 400 => emqx_dashboard_swagger:error_codes(['BAD_REQUEST']) @@ -131,6 +134,14 @@ fields(core_replicants) -> #{desc => <<"Core node name">>, example => <<"emqx-core@127.0.0.1">>} )}, {replicant_nodes, ?HOCON(?ARRAY(?REF(replicant_info)))} + ]; +fields(timeout) -> + [ + {timeout, + ?HOCON( + non_neg_integer(), + #{desc => <<"Timeout in milliseconds">>, example => <<"15000">>} + )} ]. validate_node(Node) -> @@ -188,17 +199,24 @@ running_cores() -> Running = emqx:running_nodes(), lists:filter(fun(C) -> lists:member(C, Running) end, emqx:cluster_nodes(cores)). -invite_node(put, #{bindings := #{node := Node0}}) -> +invite_node(put, #{bindings := #{node := Node0}, body := Body}) -> Node = ekka_node:parse_name(binary_to_list(Node0)), - case emqx_mgmt_cluster_proto_v1:invite_node(Node, node()) of - ok -> - {200}; - ignore -> - {400, #{code => 'BAD_REQUEST', message => <<"Can't invite self">>}}; - {badrpc, Error} -> - {400, #{code => 'BAD_REQUEST', message => error_message(Error)}}; - {error, Error} -> - {400, #{code => 'BAD_REQUEST', message => error_message(Error)}} + case maps:get(<<"timeout">>, Body, ?DEFAULT_INVITE_TIMEOUT) of + T when not is_integer(T) -> + {400, #{code => 'BAD_REQUEST', message => <<"timeout must be integer">>}}; + T when T < 5000 -> + {400, #{code => 'BAD_REQUEST', message => <<"timeout can't less than 5000ms">>}}; + Timeout -> + case emqx_mgmt_cluster_proto_v3:invite_node(Node, node(), Timeout) of + ok -> + {200}; + ignore -> + {400, #{code => 'BAD_REQUEST', message => <<"Can't invite self">>}}; + {badrpc, Error} -> + {400, #{code => 'BAD_REQUEST', message => error_message(Error)}}; + {error, Error} -> + {400, #{code => 'BAD_REQUEST', message => error_message(Error)}} + end end. force_leave(delete, #{bindings := #{node := Node0}}) -> diff --git a/apps/emqx_management/src/proto/emqx_mgmt_cluster_proto_v3.erl b/apps/emqx_management/src/proto/emqx_mgmt_cluster_proto_v3.erl new file mode 100644 index 000000000..8110ac2cb --- /dev/null +++ b/apps/emqx_management/src/proto/emqx_mgmt_cluster_proto_v3.erl @@ -0,0 +1,38 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mgmt_cluster_proto_v3). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + invite_node/3, + connected_replicants/1 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.5.0". + +-spec invite_node(node(), node(), timeout()) -> ok | ignore | {error, term()} | emqx_rpc:badrpc(). +invite_node(Node, Self, Timeout) when is_integer(Timeout) -> + rpc:call(Node, emqx_mgmt_api_cluster, join, [Self], Timeout). + +-spec connected_replicants([node()]) -> emqx_rpc:multicall_result(). +connected_replicants(Nodes) -> + rpc:multicall(Nodes, emqx_mgmt_api_cluster, connected_replicants, [], 30_000). diff --git a/apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl index 6e33c4001..3d4124b28 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl @@ -35,6 +35,9 @@ end_per_suite(_) -> init_per_testcase(TC = t_cluster_topology_api_replicants, Config0) -> Config = [{tc_name, TC} | Config0], [{cluster, cluster(Config)} | setup(Config)]; +init_per_testcase(TC = t_cluster_invite_api_timeout, Config0) -> + Config = [{tc_name, TC} | Config0], + [{cluster, cluster(Config)} | setup(Config)]; init_per_testcase(_TC, Config) -> emqx_mgmt_api_test_util:init_suite(?APPS), Config. @@ -42,6 +45,9 @@ init_per_testcase(_TC, Config) -> end_per_testcase(t_cluster_topology_api_replicants, Config) -> emqx_cth_cluster:stop(?config(cluster, Config)), cleanup(Config); +end_per_testcase(t_cluster_invite_api_timeout, Config) -> + emqx_cth_cluster:stop(?config(cluster, Config)), + cleanup(Config); end_per_testcase(_TC, _Config) -> emqx_mgmt_api_test_util:end_suite(?APPS). @@ -77,12 +83,94 @@ t_cluster_topology_api_replicants(Config) -> || Resp <- [lists:sort(R) || R <- [Core1Resp, Core2Resp, ReplResp]] ]. +t_cluster_invite_api_timeout(Config) -> + %% assert the cluster is created + [Core1, Core2, Replicant] = _NodesList = ?config(cluster, Config), + {200, Core1Resp} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]), + ?assertMatch( + [ + #{ + core_node := Core1, + replicant_nodes := + [#{node := Replicant, streams := _}] + }, + #{ + core_node := Core2, + replicant_nodes := + [#{node := Replicant, streams := _}] + } + ], + lists:sort(Core1Resp) + ), + + %% force leave the core2 + {204} = rpc:call( + Core1, + emqx_mgmt_api_cluster, + force_leave, + [delete, #{bindings => #{node => atom_to_binary(Core2)}}] + ), + + %% assert the cluster is updated + {200, Core1Resp2} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]), + ?assertMatch( + [ + #{ + core_node := Core1, + replicant_nodes := + [#{node := Replicant, streams := _}] + } + ], + lists:sort(Core1Resp2) + ), + + %% assert timeout parameter checking + Invite = fun(Node, Timeout) -> + Node1 = atom_to_binary(Node), + rpc:call( + Core1, + emqx_mgmt_api_cluster, + invite_node, + [put, #{bindings => #{node => Node1}, body => #{<<"timeout">> => Timeout}}] + ) + end, + ?assertMatch( + {400, #{code := 'BAD_REQUEST', message := <<"timeout must be integer">>}}, + Invite(Core2, not_a_integer_timeout) + ), + ?assertMatch( + {400, #{code := 'BAD_REQUEST', message := <<"timeout can't less than 5000ms">>}}, + Invite(Core2, 3000) + ), + + %% assert cluster is updated after invite + ?assertMatch( + {200}, + Invite(Core2, 15000) + ), + {200, Core1Resp3} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]), + ?assertMatch( + [ + #{ + core_node := Core1, + replicant_nodes := + [#{node := Replicant, streams := _}] + }, + #{ + core_node := Core2, + replicant_nodes := _ + } + ], + lists:sort(Core1Resp3) + ). + cluster(Config) -> + NodeSpec = #{apps => ?APPS}, Nodes = emqx_cth_cluster:start( [ - {data_backup_core1, #{role => core, apps => ?APPS}}, - {data_backup_core2, #{role => core, apps => ?APPS}}, - {data_backup_replicant, #{role => replicant, apps => ?APPS}} + {data_backup_core1, NodeSpec#{role => core}}, + {data_backup_core2, NodeSpec#{role => core}}, + {data_backup_replicant, NodeSpec#{role => replicant}} ], #{work_dir => work_dir(Config)} ),