diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 2777aec53..9721a7f2f 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -41,6 +41,7 @@ {emqx_mgmt_api_plugins,2}. {emqx_mgmt_cluster,1}. {emqx_mgmt_cluster,2}. +{emqx_mgmt_cluster,3}. {emqx_mgmt_data_backup,1}. {emqx_mgmt_trace,1}. {emqx_mgmt_trace,2}. diff --git a/apps/emqx_management/src/emqx_mgmt_api_cluster.erl b/apps/emqx_management/src/emqx_mgmt_api_cluster.erl index be1b8e354..686a0be71 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_cluster.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_cluster.erl @@ -26,11 +26,15 @@ cluster_info/2, cluster_topology/2, invite_node/2, + invite_node_async/2, + get_invitation_status/2, force_leave/2, join/1, connected_replicants/0 ]). +-define(DEFAULT_INVITE_TIMEOUT, 15000). + namespace() -> "cluster". api_spec() -> @@ -40,7 +44,9 @@ paths() -> [ "/cluster", "/cluster/topology", + "/cluster/invitation", "/cluster/:node/invite", + "/cluster/:node/invite_async", "/cluster/:node/force_leave" ]. @@ -70,6 +76,20 @@ schema("/cluster/topology") -> } } }; +schema("/cluster/invitation") -> + #{ + 'operationId' => get_invitation_status, + get => #{ + desc => ?DESC(get_invitation_status), + tags => [<<"Cluster">>], + responses => #{ + 200 => ?HOCON( + ?REF(invitation_status), + #{desc => <<"Get invitation progress created by async operation">>} + ) + } + } + }; schema("/cluster/:node/invite") -> #{ 'operationId' => invite_node, @@ -77,6 +97,20 @@ schema("/cluster/:node/invite") -> desc => ?DESC(invite_node), tags => [<<"Cluster">>], parameters => [hoconsc:ref(node)], + 'requestBody' => hoconsc:ref(timeout), + responses => #{ + 200 => <<"ok">>, + 400 => emqx_dashboard_swagger:error_codes(['BAD_REQUEST']) + } + } + }; +schema("/cluster/:node/invite_async") -> + #{ + 'operationId' => invite_node_async, + put => #{ + desc => ?DESC(invite_node_async), + tags => [<<"Cluster">>], + parameters => [hoconsc:ref(node)], responses => #{ 200 => <<"ok">>, 400 => emqx_dashboard_swagger:error_codes(['BAD_REQUEST']) @@ -131,6 +165,71 @@ fields(core_replicants) -> #{desc => <<"Core node name">>, example => <<"emqx-core@127.0.0.1">>} )}, {replicant_nodes, ?HOCON(?ARRAY(?REF(replicant_info)))} + ]; +fields(timeout) -> + [ + {timeout, + ?HOCON( + non_neg_integer(), + #{desc => <<"Timeout in milliseconds">>, example => <<"15000">>} + )} + ]; +fields(invitation_status) -> + [ + {succeed, + ?HOCON( + ?ARRAY(?REF(node_invitation_succeed)), + #{desc => <<"A list of information about nodes which are successfully invited">>} + )}, + {in_progress, + ?HOCON( + ?ARRAY(?REF(node_invitation_in_progress)), + #{desc => <<"A list of information about nodes that are processing invitations">>} + )}, + {failed, + ?HOCON( + ?ARRAY(?REF(node_invitation_failed)), + #{desc => <<"A list of information about nodes that failed to be invited">>} + )} + ]; +fields(node_invitation_failed) -> + fields(node_invitation_succeed) ++ + [ + {reason, + ?HOCON( + binary(), + #{desc => <<"Failure reason">>, example => <<"Bad RPC to target node">>} + )} + ]; +fields(node_invitation_succeed) -> + fields(node_invitation_in_progress) ++ + [ + {finished_at, + ?HOCON( + emqx_utils_calendar:epoch_millisecond(), + #{ + desc => + <<"The time of the async invitation result is received, millisecond precision epoch">>, + example => <<"1705044829915">> + } + )} + ]; +fields(node_invitation_in_progress) -> + [ + {node, + ?HOCON( + binary(), + #{desc => <<"Node name">>, example => <<"emqx2@127.0.0.1">>} + )}, + {started_at, + ?HOCON( + emqx_utils_calendar:epoch_millisecond(), + #{ + desc => + <<"The start timestamp of the invitation, millisecond precision epoch">>, + example => <<"1705044829915">> + } + )} ]. validate_node(Node) -> @@ -188,19 +287,43 @@ running_cores() -> Running = emqx:running_nodes(), lists:filter(fun(C) -> lists:member(C, Running) end, emqx:cluster_nodes(cores)). -invite_node(put, #{bindings := #{node := Node0}}) -> +invite_node(put, #{bindings := #{node := Node0}, body := Body}) -> Node = ekka_node:parse_name(binary_to_list(Node0)), - case emqx_mgmt_cluster_proto_v1:invite_node(Node, node()) of + case maps:get(<<"timeout">>, Body, ?DEFAULT_INVITE_TIMEOUT) of + T when not is_integer(T) -> + {400, #{code => 'BAD_REQUEST', message => <<"timeout must be an integer">>}}; + T when T < 5000 -> + {400, #{code => 'BAD_REQUEST', message => <<"timeout cannot be less than 5000ms">>}}; + Timeout -> + case emqx_mgmt_cluster_proto_v3:invite_node(Node, node(), Timeout) of + ok -> + {200}; + ignore -> + {400, #{code => 'BAD_REQUEST', message => <<"Cannot invite self">>}}; + {badrpc, Error} -> + {400, #{code => 'BAD_REQUEST', message => error_message(Error)}}; + {error, Error} -> + {400, #{code => 'BAD_REQUEST', message => error_message(Error)}} + end + end. + +invite_node_async(put, #{bindings := #{node := Node0}}) -> + Node = ekka_node:parse_name(binary_to_list(Node0)), + case emqx_mgmt_cluster:invite_async(Node) of ok -> {200}; ignore -> {400, #{code => 'BAD_REQUEST', message => <<"Can't invite self">>}}; - {badrpc, Error} -> - {400, #{code => 'BAD_REQUEST', message => error_message(Error)}}; - {error, Error} -> - {400, #{code => 'BAD_REQUEST', message => error_message(Error)}} + {error, {already_started, _Pid}} -> + {400, #{ + code => 'BAD_REQUEST', + message => <<"The invitation task already created for this node">> + }} end. +get_invitation_status(get, _) -> + {200, format_invitation_status(emqx_mgmt_cluster:invitation_status())}. + force_leave(delete, #{bindings := #{node := Node0}}) -> Node = ekka_node:parse_name(binary_to_list(Node0)), case ekka:force_leave(Node) of @@ -222,3 +345,27 @@ connected_replicants() -> error_message(Msg) -> iolist_to_binary(io_lib:format("~p", [Msg])). + +format_invitation_status(#{ + succeed := Succeed, + in_progress := InProgress, + failed := Failed +}) -> + #{ + succeed => format_invitation_info(Succeed), + in_progress => format_invitation_info(InProgress), + failed => format_invitation_info(Failed) + }. + +format_invitation_info(L) when is_list(L) -> + lists:map( + fun(Info) -> + Info1 = emqx_utils_maps:update_if_present( + started_at, fun emqx_utils_calendar:epoch_to_rfc3339/1, Info + ), + emqx_utils_maps:update_if_present( + finished_at, fun emqx_utils_calendar:epoch_to_rfc3339/1, Info1 + ) + end, + L + ). diff --git a/apps/emqx_management/src/emqx_mgmt_cluster.erl b/apps/emqx_management/src/emqx_mgmt_cluster.erl new file mode 100644 index 000000000..828567776 --- /dev/null +++ b/apps/emqx_management/src/emqx_mgmt_cluster.erl @@ -0,0 +1,201 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mgmt_cluster). + +-behaviour(gen_server). + +%% APIs +-export([start_link/0]). + +-export([invite_async/1, invitation_status/0]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec invite_async(atom()) -> ok | ignore | {error, {already_started, pid()}}. +invite_async(Node) -> + %% Proxy the invitation task to the leader node + JoinTo = mria_membership:leader(), + case Node =/= JoinTo of + true -> + gen_server:call({?MODULE, JoinTo}, {invite_async, Node, JoinTo}, infinity); + false -> + ignore + end. + +-spec invitation_status() -> map(). +invitation_status() -> + Leader = mria_membership:leader(), + gen_server:call({?MODULE, Leader}, invitation_status, infinity). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([]) -> + process_flag(trap_exit, true), + {ok, #{}}. + +handle_call({invite_async, Node, JoinTo}, _From, State) -> + case maps:get(Node, State, undefined) of + undefined -> + Caller = self(), + Task = spawn_link_invite_worker(Node, JoinTo, Caller), + {reply, ok, State#{Node => Task}}; + WorkerPid -> + {reply, {error, {already_started, WorkerPid}}, State} + end; +handle_call(invitation_status, _From, State) -> + {reply, state_to_invitation_status(State), State}; +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({task_done, _WorkerPid, Node, Result}, State) -> + case maps:take(Node, State) of + {Task, State1} -> + History = maps:get(history, State1, #{}), + Task1 = Task#{ + result => Result, + finished_at => erlang:system_time(millisecond) + }, + {noreply, State1#{history => History#{Node => Task1}}}; + error -> + {noreply, State} + end; +handle_info({'EXIT', WorkerPid, Reason}, State) -> + case take_node_name_via_worker_pid(WorkerPid, State) of + {key_value, Node, Task, State1} -> + History = maps:get(history, State1, #{}), + Task1 = Task#{ + result => {error, Reason}, + finished_at => erlang:system_time(millisecond) + }, + {noreply, State1#{history => History#{Node => Task1}}}; + error -> + {noreply, State} + end; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- + +spawn_link_invite_worker(Node, JoinTo, Caller) -> + Pid = erlang:spawn_link( + fun() -> + Result = + case emqx_mgmt_cluster_proto_v3:invite_node(Node, JoinTo, infinity) of + ok -> + ok; + {error, {already_in_cluster, _Node}} -> + ok; + {error, _} = E -> + E; + {badrpc, Reason} -> + {error, {badrpc, Reason}} + end, + Caller ! {task_done, self(), Node, Result} + end + ), + #{worker => Pid, started_at => erlang:system_time(millisecond)}. + +take_node_name_via_worker_pid(WorkerPid, Map) when is_map(Map) -> + Key = find_node_name_via_worker_pid(WorkerPid, maps:next(maps:iterator(Map))), + case maps:take(Key, Map) of + error -> + error; + {Vaule, Map1} -> + {key_value, Key, Vaule, Map1} + end. + +find_node_name_via_worker_pid(_WorkerPid, none) -> + error; +find_node_name_via_worker_pid(WorkerPid, {Key, Task, I}) -> + case maps:get(worker, Task, undefined) of + WorkerPid -> + Key; + _ -> + find_node_name_via_worker_pid(WorkerPid, maps:next(I)) + end. + +state_to_invitation_status(State) -> + History = maps:get(history, State, #{}), + {Succ, Failed} = lists:foldl( + fun({Node, Task}, {SuccAcc, FailedAcc}) -> + #{ + started_at := StartedAt, + finished_at := FinishedAt, + result := Result + } = Task, + Ret = #{node => Node, started_at => StartedAt, finished_at => FinishedAt}, + case is_succeed_result(Result) of + true -> + {[Ret | SuccAcc], FailedAcc}; + false -> + {SuccAcc, [Ret#{reason => format_error_reason(Result)} | FailedAcc]} + end + end, + {[], []}, + maps:to_list(History) + ), + + InPro = maps:fold( + fun(Node, _Task = #{started_at := StartedAt}, Acc) -> + [#{node => Node, started_at => StartedAt} | Acc] + end, + [], + maps:without([history], State) + ), + #{succeed => Succ, in_progress => InPro, failed => Failed}. + +is_succeed_result(Result) -> + case Result of + ok -> + true; + {error, {already_in_cluster, _Node}} -> + true; + _ -> + false + end. + +format_error_reason(Term) -> + iolist_to_binary(io_lib:format("~0p", [Term])). diff --git a/apps/emqx_management/src/emqx_mgmt_sup.erl b/apps/emqx_management/src/emqx_mgmt_sup.erl index 713ff87dc..5bd8632c3 100644 --- a/apps/emqx_management/src/emqx_mgmt_sup.erl +++ b/apps/emqx_management/src/emqx_mgmt_sup.erl @@ -33,7 +33,8 @@ init([]) -> _ -> [] end, - {ok, {{one_for_one, 1, 5}, Workers}}. + Cluster = child_spec(emqx_mgmt_cluster, 5000, worker), + {ok, {{one_for_one, 1, 5}, [Cluster | Workers]}}. child_spec(Mod, Shutdown, Type) -> #{ diff --git a/apps/emqx_management/src/proto/emqx_mgmt_cluster_proto_v3.erl b/apps/emqx_management/src/proto/emqx_mgmt_cluster_proto_v3.erl new file mode 100644 index 000000000..b00d63e1d --- /dev/null +++ b/apps/emqx_management/src/proto/emqx_mgmt_cluster_proto_v3.erl @@ -0,0 +1,38 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mgmt_cluster_proto_v3). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + invite_node/3, + connected_replicants/1 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.5.0". + +-spec invite_node(node(), node(), timeout()) -> ok | ignore | {error, term()} | emqx_rpc:badrpc(). +invite_node(Node, Self, Timeout) when is_integer(Timeout); Timeout =:= infinity -> + rpc:call(Node, emqx_mgmt_api_cluster, join, [Self], Timeout). + +-spec connected_replicants([node()]) -> emqx_rpc:multicall_result(). +connected_replicants(Nodes) -> + rpc:multicall(Nodes, emqx_mgmt_api_cluster, connected_replicants, [], 30_000). diff --git a/apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl index 6e33c4001..b2658f8fa 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -35,6 +35,12 @@ end_per_suite(_) -> init_per_testcase(TC = t_cluster_topology_api_replicants, Config0) -> Config = [{tc_name, TC} | Config0], [{cluster, cluster(Config)} | setup(Config)]; +init_per_testcase(TC = t_cluster_invite_api_timeout, Config0) -> + Config = [{tc_name, TC} | Config0], + [{cluster, cluster(Config)} | setup(Config)]; +init_per_testcase(TC = t_cluster_invite_async, Config0) -> + Config = [{tc_name, TC} | Config0], + [{cluster, cluster(Config)} | setup(Config)]; init_per_testcase(_TC, Config) -> emqx_mgmt_api_test_util:init_suite(?APPS), Config. @@ -42,6 +48,12 @@ init_per_testcase(_TC, Config) -> end_per_testcase(t_cluster_topology_api_replicants, Config) -> emqx_cth_cluster:stop(?config(cluster, Config)), cleanup(Config); +end_per_testcase(t_cluster_invite_api_timeout, Config) -> + emqx_cth_cluster:stop(?config(cluster, Config)), + cleanup(Config); +end_per_testcase(t_cluster_invite_async, Config) -> + emqx_cth_cluster:stop(?config(cluster, Config)), + cleanup(Config); end_per_testcase(_TC, _Config) -> emqx_mgmt_api_test_util:end_suite(?APPS). @@ -77,12 +89,186 @@ t_cluster_topology_api_replicants(Config) -> || Resp <- [lists:sort(R) || R <- [Core1Resp, Core2Resp, ReplResp]] ]. +t_cluster_invite_api_timeout(Config) -> + %% assert the cluster is created + [Core1, Core2, Replicant] = _NodesList = ?config(cluster, Config), + {200, Core1Resp} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]), + ?assertMatch( + [ + #{ + core_node := Core1, + replicant_nodes := + [#{node := Replicant, streams := _}] + }, + #{ + core_node := Core2, + replicant_nodes := + [#{node := Replicant, streams := _}] + } + ], + lists:sort(Core1Resp) + ), + + %% force leave the core2 + {204} = rpc:call( + Core1, + emqx_mgmt_api_cluster, + force_leave, + [delete, #{bindings => #{node => atom_to_binary(Core2)}}] + ), + + %% assert the cluster is updated + {200, Core1Resp2} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]), + ?assertMatch( + [ + #{ + core_node := Core1, + replicant_nodes := + [#{node := Replicant, streams := _}] + } + ], + lists:sort(Core1Resp2) + ), + + %% assert timeout parameter checking + Invite = fun(Node, Timeout) -> + Node1 = atom_to_binary(Node), + rpc:call( + Core1, + emqx_mgmt_api_cluster, + invite_node, + [put, #{bindings => #{node => Node1}, body => #{<<"timeout">> => Timeout}}] + ) + end, + ?assertMatch( + {400, #{code := 'BAD_REQUEST', message := <<"timeout must be an integer">>}}, + Invite(Core2, not_a_integer_timeout) + ), + ?assertMatch( + {400, #{code := 'BAD_REQUEST', message := <<"timeout cannot be less than 5000ms">>}}, + Invite(Core2, 3000) + ), + + %% assert cluster is updated after invite + ?assertMatch( + {200}, + Invite(Core2, 15000) + ), + {200, Core1Resp3} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]), + ?assertMatch( + [ + #{ + core_node := Core1, + replicant_nodes := + [#{node := Replicant, streams := _}] + }, + #{ + core_node := Core2, + replicant_nodes := _ + } + ], + lists:sort(Core1Resp3) + ). + +t_cluster_invite_async(Config) -> + %% assert the cluster is created + [Core1, Core2, Replicant] = _NodesList = ?config(cluster, Config), + {200, Core1Resp} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]), + ?assertMatch( + [ + #{ + core_node := Core1, + replicant_nodes := + [#{node := Replicant, streams := _}] + }, + #{ + core_node := Core2, + replicant_nodes := + [#{node := Replicant, streams := _}] + } + ], + lists:sort(Core1Resp) + ), + + %% force leave the core2 and replicant + {204} = rpc:call( + Core1, + emqx_mgmt_api_cluster, + force_leave, + [delete, #{bindings => #{node => atom_to_binary(Core2)}}] + ), + %% assert the cluster is updated + {200, Core1Resp2} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]), + ?assertMatch( + [ + #{ + core_node := Core1, + replicant_nodes := [_] + } + ], + lists:sort(Core1Resp2) + ), + + Invite = fun(Node) -> + Node1 = atom_to_binary(Node), + rpc:call( + Core1, + emqx_mgmt_api_cluster, + invite_node_async, + [put, #{bindings => #{node => Node1}}] + ) + end, + + %% parameter checking + ?assertMatch( + {400, #{code := 'BAD_REQUEST', message := <<"Can't invite self">>}}, + Invite(Core1) + ), + ?assertMatch( + {200}, + Invite(Core2) + ), + %% already invited + ?assertMatch( + {400, #{ + code := 'BAD_REQUEST', + message := <<"The invitation task already created for this node">> + }}, + Invite(Core2) + ), + + %% assert: core2 is in_progress status + ?assertMatch( + {200, #{in_progress := [#{node := Core2}]}}, + rpc:call(Core1, emqx_mgmt_api_cluster, get_invitation_status, [get, #{}]) + ), + + %% waiting the async invitation_succeed + ?assertMatch({succeed, _}, waiting_the_async_invitation_succeed(Core1, Core2)), + + {200, Core1Resp3} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]), + ?assertMatch( + [ + #{ + core_node := Core1, + replicant_nodes := + [#{node := Replicant, streams := _}] + }, + #{ + core_node := Core2, + replicant_nodes := _ + } + ], + lists:sort(Core1Resp3) + ). + cluster(Config) -> + NodeSpec = #{apps => ?APPS}, Nodes = emqx_cth_cluster:start( [ - {data_backup_core1, #{role => core, apps => ?APPS}}, - {data_backup_core2, #{role => core, apps => ?APPS}}, - {data_backup_replicant, #{role => replicant, apps => ?APPS}} + {data_backup_core1, NodeSpec#{role => core}}, + {data_backup_core2, NodeSpec#{role => core}}, + {data_backup_replicant, NodeSpec#{role => replicant}} ], #{work_dir => work_dir(Config)} ), @@ -98,3 +284,37 @@ cleanup(Config) -> work_dir(Config) -> filename:join(?config(priv_dir, Config), ?config(tc_name, Config)). + +waiting_the_async_invitation_succeed(Node, TargetNode) -> + waiting_the_async_invitation_succeed(Node, TargetNode, 100). + +waiting_the_async_invitation_succeed(_Node, _TargetNode, 0) -> + error(timeout); +waiting_the_async_invitation_succeed(Node, TargetNode, N) -> + {200, #{ + in_progress := InProgress, + succeed := Succeed, + failed := Failed + }} = rpc:call(Node, emqx_mgmt_api_cluster, get_invitation_status, [get, #{}]), + case find_node_info_list(TargetNode, InProgress) of + error -> + case find_node_info_list(TargetNode, Succeed) of + error -> + case find_node_info_list(TargetNode, Failed) of + error -> error; + Info1 -> {failed, Info1} + end; + Info2 -> + {succeed, Info2} + end; + _Info -> + timer:sleep(1000), + waiting_the_async_invitation_succeed(Node, TargetNode, N - 1) + end. + +find_node_info_list(Node, List) -> + L = lists:filter(fun(#{node := N}) -> N =:= Node end, List), + case L of + [] -> error; + [Info] -> Info + end. diff --git a/changes/ce/feat-12267.en.md b/changes/ce/feat-12267.en.md new file mode 100644 index 000000000..5574918a4 --- /dev/null +++ b/changes/ce/feat-12267.en.md @@ -0,0 +1,5 @@ +Add a new `timeout` parameter to the `cluster/:node/invite` interface. +Previously the default timeout was 5s which would often be caused by HTTP API calls due to emqx taking too long to join cluster. + +Add a new endpoint `/cluster/:node/invite_async` to support an asynchronous way to invite nodes to join the cluster, +and a new endpoint `cluster/invitation` to inspect the join status. diff --git a/rel/i18n/emqx_mgmt_api_cluster.hocon b/rel/i18n/emqx_mgmt_api_cluster.hocon index 0e6fceba1..24de515d9 100644 --- a/rel/i18n/emqx_mgmt_api_cluster.hocon +++ b/rel/i18n/emqx_mgmt_api_cluster.hocon @@ -15,9 +15,19 @@ invite_node.desc: invite_node.label: """Invite node to cluster""" +invite_node_async.desc: +"""Send a join invitation to a node to join the cluster but do not wait for the join result. Join status can be retrieved with `GET api//invitation`""" +invite_node_async.label: +"""Asynchronously invite""" + force_remove_node.desc: """Force leave node from cluster""" force_remove_node.label: """Force leave node from cluster""" +get_invitation_status.desc: +"""Get the execution status of all asynchronous invite status per node""" +get_invitation_status.label: +"""Get invitation statuses""" + }