Merge pull request #12267 from HJianBo/new-timeout-param-for-invite
feat(cluster): supports inviting nodes to join the cluster in an asynchronous manner
This commit is contained in:
commit
b6d0365027
|
@ -41,6 +41,7 @@
|
||||||
{emqx_mgmt_api_plugins,2}.
|
{emqx_mgmt_api_plugins,2}.
|
||||||
{emqx_mgmt_cluster,1}.
|
{emqx_mgmt_cluster,1}.
|
||||||
{emqx_mgmt_cluster,2}.
|
{emqx_mgmt_cluster,2}.
|
||||||
|
{emqx_mgmt_cluster,3}.
|
||||||
{emqx_mgmt_data_backup,1}.
|
{emqx_mgmt_data_backup,1}.
|
||||||
{emqx_mgmt_trace,1}.
|
{emqx_mgmt_trace,1}.
|
||||||
{emqx_mgmt_trace,2}.
|
{emqx_mgmt_trace,2}.
|
||||||
|
|
|
@ -26,11 +26,15 @@
|
||||||
cluster_info/2,
|
cluster_info/2,
|
||||||
cluster_topology/2,
|
cluster_topology/2,
|
||||||
invite_node/2,
|
invite_node/2,
|
||||||
|
invite_node_async/2,
|
||||||
|
get_invitation_status/2,
|
||||||
force_leave/2,
|
force_leave/2,
|
||||||
join/1,
|
join/1,
|
||||||
connected_replicants/0
|
connected_replicants/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-define(DEFAULT_INVITE_TIMEOUT, 15000).
|
||||||
|
|
||||||
namespace() -> "cluster".
|
namespace() -> "cluster".
|
||||||
|
|
||||||
api_spec() ->
|
api_spec() ->
|
||||||
|
@ -40,7 +44,9 @@ paths() ->
|
||||||
[
|
[
|
||||||
"/cluster",
|
"/cluster",
|
||||||
"/cluster/topology",
|
"/cluster/topology",
|
||||||
|
"/cluster/invitation",
|
||||||
"/cluster/:node/invite",
|
"/cluster/:node/invite",
|
||||||
|
"/cluster/:node/invite_async",
|
||||||
"/cluster/:node/force_leave"
|
"/cluster/:node/force_leave"
|
||||||
].
|
].
|
||||||
|
|
||||||
|
@ -70,6 +76,20 @@ schema("/cluster/topology") ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
schema("/cluster/invitation") ->
|
||||||
|
#{
|
||||||
|
'operationId' => get_invitation_status,
|
||||||
|
get => #{
|
||||||
|
desc => ?DESC(get_invitation_status),
|
||||||
|
tags => [<<"Cluster">>],
|
||||||
|
responses => #{
|
||||||
|
200 => ?HOCON(
|
||||||
|
?REF(invitation_status),
|
||||||
|
#{desc => <<"Get invitation progress created by async operation">>}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
schema("/cluster/:node/invite") ->
|
schema("/cluster/:node/invite") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => invite_node,
|
'operationId' => invite_node,
|
||||||
|
@ -77,6 +97,20 @@ schema("/cluster/:node/invite") ->
|
||||||
desc => ?DESC(invite_node),
|
desc => ?DESC(invite_node),
|
||||||
tags => [<<"Cluster">>],
|
tags => [<<"Cluster">>],
|
||||||
parameters => [hoconsc:ref(node)],
|
parameters => [hoconsc:ref(node)],
|
||||||
|
'requestBody' => hoconsc:ref(timeout),
|
||||||
|
responses => #{
|
||||||
|
200 => <<"ok">>,
|
||||||
|
400 => emqx_dashboard_swagger:error_codes(['BAD_REQUEST'])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
schema("/cluster/:node/invite_async") ->
|
||||||
|
#{
|
||||||
|
'operationId' => invite_node_async,
|
||||||
|
put => #{
|
||||||
|
desc => ?DESC(invite_node_async),
|
||||||
|
tags => [<<"Cluster">>],
|
||||||
|
parameters => [hoconsc:ref(node)],
|
||||||
responses => #{
|
responses => #{
|
||||||
200 => <<"ok">>,
|
200 => <<"ok">>,
|
||||||
400 => emqx_dashboard_swagger:error_codes(['BAD_REQUEST'])
|
400 => emqx_dashboard_swagger:error_codes(['BAD_REQUEST'])
|
||||||
|
@ -131,6 +165,71 @@ fields(core_replicants) ->
|
||||||
#{desc => <<"Core node name">>, example => <<"emqx-core@127.0.0.1">>}
|
#{desc => <<"Core node name">>, example => <<"emqx-core@127.0.0.1">>}
|
||||||
)},
|
)},
|
||||||
{replicant_nodes, ?HOCON(?ARRAY(?REF(replicant_info)))}
|
{replicant_nodes, ?HOCON(?ARRAY(?REF(replicant_info)))}
|
||||||
|
];
|
||||||
|
fields(timeout) ->
|
||||||
|
[
|
||||||
|
{timeout,
|
||||||
|
?HOCON(
|
||||||
|
non_neg_integer(),
|
||||||
|
#{desc => <<"Timeout in milliseconds">>, example => <<"15000">>}
|
||||||
|
)}
|
||||||
|
];
|
||||||
|
fields(invitation_status) ->
|
||||||
|
[
|
||||||
|
{succeed,
|
||||||
|
?HOCON(
|
||||||
|
?ARRAY(?REF(node_invitation_succeed)),
|
||||||
|
#{desc => <<"A list of information about nodes which are successfully invited">>}
|
||||||
|
)},
|
||||||
|
{in_progress,
|
||||||
|
?HOCON(
|
||||||
|
?ARRAY(?REF(node_invitation_in_progress)),
|
||||||
|
#{desc => <<"A list of information about nodes that are processing invitations">>}
|
||||||
|
)},
|
||||||
|
{failed,
|
||||||
|
?HOCON(
|
||||||
|
?ARRAY(?REF(node_invitation_failed)),
|
||||||
|
#{desc => <<"A list of information about nodes that failed to be invited">>}
|
||||||
|
)}
|
||||||
|
];
|
||||||
|
fields(node_invitation_failed) ->
|
||||||
|
fields(node_invitation_succeed) ++
|
||||||
|
[
|
||||||
|
{reason,
|
||||||
|
?HOCON(
|
||||||
|
binary(),
|
||||||
|
#{desc => <<"Failure reason">>, example => <<"Bad RPC to target node">>}
|
||||||
|
)}
|
||||||
|
];
|
||||||
|
fields(node_invitation_succeed) ->
|
||||||
|
fields(node_invitation_in_progress) ++
|
||||||
|
[
|
||||||
|
{finished_at,
|
||||||
|
?HOCON(
|
||||||
|
emqx_utils_calendar:epoch_millisecond(),
|
||||||
|
#{
|
||||||
|
desc =>
|
||||||
|
<<"The time of the async invitation result is received, millisecond precision epoch">>,
|
||||||
|
example => <<"1705044829915">>
|
||||||
|
}
|
||||||
|
)}
|
||||||
|
];
|
||||||
|
fields(node_invitation_in_progress) ->
|
||||||
|
[
|
||||||
|
{node,
|
||||||
|
?HOCON(
|
||||||
|
binary(),
|
||||||
|
#{desc => <<"Node name">>, example => <<"emqx2@127.0.0.1">>}
|
||||||
|
)},
|
||||||
|
{started_at,
|
||||||
|
?HOCON(
|
||||||
|
emqx_utils_calendar:epoch_millisecond(),
|
||||||
|
#{
|
||||||
|
desc =>
|
||||||
|
<<"The start timestamp of the invitation, millisecond precision epoch">>,
|
||||||
|
example => <<"1705044829915">>
|
||||||
|
}
|
||||||
|
)}
|
||||||
].
|
].
|
||||||
|
|
||||||
validate_node(Node) ->
|
validate_node(Node) ->
|
||||||
|
@ -188,19 +287,43 @@ running_cores() ->
|
||||||
Running = emqx:running_nodes(),
|
Running = emqx:running_nodes(),
|
||||||
lists:filter(fun(C) -> lists:member(C, Running) end, emqx:cluster_nodes(cores)).
|
lists:filter(fun(C) -> lists:member(C, Running) end, emqx:cluster_nodes(cores)).
|
||||||
|
|
||||||
invite_node(put, #{bindings := #{node := Node0}}) ->
|
invite_node(put, #{bindings := #{node := Node0}, body := Body}) ->
|
||||||
Node = ekka_node:parse_name(binary_to_list(Node0)),
|
Node = ekka_node:parse_name(binary_to_list(Node0)),
|
||||||
case emqx_mgmt_cluster_proto_v1:invite_node(Node, node()) of
|
case maps:get(<<"timeout">>, Body, ?DEFAULT_INVITE_TIMEOUT) of
|
||||||
|
T when not is_integer(T) ->
|
||||||
|
{400, #{code => 'BAD_REQUEST', message => <<"timeout must be an integer">>}};
|
||||||
|
T when T < 5000 ->
|
||||||
|
{400, #{code => 'BAD_REQUEST', message => <<"timeout cannot be less than 5000ms">>}};
|
||||||
|
Timeout ->
|
||||||
|
case emqx_mgmt_cluster_proto_v3:invite_node(Node, node(), Timeout) of
|
||||||
|
ok ->
|
||||||
|
{200};
|
||||||
|
ignore ->
|
||||||
|
{400, #{code => 'BAD_REQUEST', message => <<"Cannot invite self">>}};
|
||||||
|
{badrpc, Error} ->
|
||||||
|
{400, #{code => 'BAD_REQUEST', message => error_message(Error)}};
|
||||||
|
{error, Error} ->
|
||||||
|
{400, #{code => 'BAD_REQUEST', message => error_message(Error)}}
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
invite_node_async(put, #{bindings := #{node := Node0}}) ->
|
||||||
|
Node = ekka_node:parse_name(binary_to_list(Node0)),
|
||||||
|
case emqx_mgmt_cluster:invite_async(Node) of
|
||||||
ok ->
|
ok ->
|
||||||
{200};
|
{200};
|
||||||
ignore ->
|
ignore ->
|
||||||
{400, #{code => 'BAD_REQUEST', message => <<"Can't invite self">>}};
|
{400, #{code => 'BAD_REQUEST', message => <<"Can't invite self">>}};
|
||||||
{badrpc, Error} ->
|
{error, {already_started, _Pid}} ->
|
||||||
{400, #{code => 'BAD_REQUEST', message => error_message(Error)}};
|
{400, #{
|
||||||
{error, Error} ->
|
code => 'BAD_REQUEST',
|
||||||
{400, #{code => 'BAD_REQUEST', message => error_message(Error)}}
|
message => <<"The invitation task already created for this node">>
|
||||||
|
}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
get_invitation_status(get, _) ->
|
||||||
|
{200, format_invitation_status(emqx_mgmt_cluster:invitation_status())}.
|
||||||
|
|
||||||
force_leave(delete, #{bindings := #{node := Node0}}) ->
|
force_leave(delete, #{bindings := #{node := Node0}}) ->
|
||||||
Node = ekka_node:parse_name(binary_to_list(Node0)),
|
Node = ekka_node:parse_name(binary_to_list(Node0)),
|
||||||
case ekka:force_leave(Node) of
|
case ekka:force_leave(Node) of
|
||||||
|
@ -222,3 +345,27 @@ connected_replicants() ->
|
||||||
|
|
||||||
error_message(Msg) ->
|
error_message(Msg) ->
|
||||||
iolist_to_binary(io_lib:format("~p", [Msg])).
|
iolist_to_binary(io_lib:format("~p", [Msg])).
|
||||||
|
|
||||||
|
format_invitation_status(#{
|
||||||
|
succeed := Succeed,
|
||||||
|
in_progress := InProgress,
|
||||||
|
failed := Failed
|
||||||
|
}) ->
|
||||||
|
#{
|
||||||
|
succeed => format_invitation_info(Succeed),
|
||||||
|
in_progress => format_invitation_info(InProgress),
|
||||||
|
failed => format_invitation_info(Failed)
|
||||||
|
}.
|
||||||
|
|
||||||
|
format_invitation_info(L) when is_list(L) ->
|
||||||
|
lists:map(
|
||||||
|
fun(Info) ->
|
||||||
|
Info1 = emqx_utils_maps:update_if_present(
|
||||||
|
started_at, fun emqx_utils_calendar:epoch_to_rfc3339/1, Info
|
||||||
|
),
|
||||||
|
emqx_utils_maps:update_if_present(
|
||||||
|
finished_at, fun emqx_utils_calendar:epoch_to_rfc3339/1, Info1
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
L
|
||||||
|
).
|
||||||
|
|
|
@ -0,0 +1,201 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_mgmt_cluster).
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
%% APIs
|
||||||
|
-export([start_link/0]).
|
||||||
|
|
||||||
|
-export([invite_async/1, invitation_status/0]).
|
||||||
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
-export([
|
||||||
|
init/1,
|
||||||
|
handle_call/3,
|
||||||
|
handle_cast/2,
|
||||||
|
handle_info/2,
|
||||||
|
terminate/2,
|
||||||
|
code_change/3
|
||||||
|
]).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% APIs
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
start_link() ->
|
||||||
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
-spec invite_async(atom()) -> ok | ignore | {error, {already_started, pid()}}.
|
||||||
|
invite_async(Node) ->
|
||||||
|
%% Proxy the invitation task to the leader node
|
||||||
|
JoinTo = mria_membership:leader(),
|
||||||
|
case Node =/= JoinTo of
|
||||||
|
true ->
|
||||||
|
gen_server:call({?MODULE, JoinTo}, {invite_async, Node, JoinTo}, infinity);
|
||||||
|
false ->
|
||||||
|
ignore
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec invitation_status() -> map().
|
||||||
|
invitation_status() ->
|
||||||
|
Leader = mria_membership:leader(),
|
||||||
|
gen_server:call({?MODULE, Leader}, invitation_status, infinity).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% gen_server callbacks
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
{ok, #{}}.
|
||||||
|
|
||||||
|
handle_call({invite_async, Node, JoinTo}, _From, State) ->
|
||||||
|
case maps:get(Node, State, undefined) of
|
||||||
|
undefined ->
|
||||||
|
Caller = self(),
|
||||||
|
Task = spawn_link_invite_worker(Node, JoinTo, Caller),
|
||||||
|
{reply, ok, State#{Node => Task}};
|
||||||
|
WorkerPid ->
|
||||||
|
{reply, {error, {already_started, WorkerPid}}, State}
|
||||||
|
end;
|
||||||
|
handle_call(invitation_status, _From, State) ->
|
||||||
|
{reply, state_to_invitation_status(State), State};
|
||||||
|
handle_call(_Request, _From, State) ->
|
||||||
|
Reply = ok,
|
||||||
|
{reply, Reply, State}.
|
||||||
|
|
||||||
|
handle_cast(_Msg, State) ->
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
handle_info({task_done, _WorkerPid, Node, Result}, State) ->
|
||||||
|
case maps:take(Node, State) of
|
||||||
|
{Task, State1} ->
|
||||||
|
History = maps:get(history, State1, #{}),
|
||||||
|
Task1 = Task#{
|
||||||
|
result => Result,
|
||||||
|
finished_at => erlang:system_time(millisecond)
|
||||||
|
},
|
||||||
|
{noreply, State1#{history => History#{Node => Task1}}};
|
||||||
|
error ->
|
||||||
|
{noreply, State}
|
||||||
|
end;
|
||||||
|
handle_info({'EXIT', WorkerPid, Reason}, State) ->
|
||||||
|
case take_node_name_via_worker_pid(WorkerPid, State) of
|
||||||
|
{key_value, Node, Task, State1} ->
|
||||||
|
History = maps:get(history, State1, #{}),
|
||||||
|
Task1 = Task#{
|
||||||
|
result => {error, Reason},
|
||||||
|
finished_at => erlang:system_time(millisecond)
|
||||||
|
},
|
||||||
|
{noreply, State1#{history => History#{Node => Task1}}};
|
||||||
|
error ->
|
||||||
|
{noreply, State}
|
||||||
|
end;
|
||||||
|
handle_info(_Info, State) ->
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
terminate(_Reason, _State) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal funcs
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
spawn_link_invite_worker(Node, JoinTo, Caller) ->
|
||||||
|
Pid = erlang:spawn_link(
|
||||||
|
fun() ->
|
||||||
|
Result =
|
||||||
|
case emqx_mgmt_cluster_proto_v3:invite_node(Node, JoinTo, infinity) of
|
||||||
|
ok ->
|
||||||
|
ok;
|
||||||
|
{error, {already_in_cluster, _Node}} ->
|
||||||
|
ok;
|
||||||
|
{error, _} = E ->
|
||||||
|
E;
|
||||||
|
{badrpc, Reason} ->
|
||||||
|
{error, {badrpc, Reason}}
|
||||||
|
end,
|
||||||
|
Caller ! {task_done, self(), Node, Result}
|
||||||
|
end
|
||||||
|
),
|
||||||
|
#{worker => Pid, started_at => erlang:system_time(millisecond)}.
|
||||||
|
|
||||||
|
take_node_name_via_worker_pid(WorkerPid, Map) when is_map(Map) ->
|
||||||
|
Key = find_node_name_via_worker_pid(WorkerPid, maps:next(maps:iterator(Map))),
|
||||||
|
case maps:take(Key, Map) of
|
||||||
|
error ->
|
||||||
|
error;
|
||||||
|
{Vaule, Map1} ->
|
||||||
|
{key_value, Key, Vaule, Map1}
|
||||||
|
end.
|
||||||
|
|
||||||
|
find_node_name_via_worker_pid(_WorkerPid, none) ->
|
||||||
|
error;
|
||||||
|
find_node_name_via_worker_pid(WorkerPid, {Key, Task, I}) ->
|
||||||
|
case maps:get(worker, Task, undefined) of
|
||||||
|
WorkerPid ->
|
||||||
|
Key;
|
||||||
|
_ ->
|
||||||
|
find_node_name_via_worker_pid(WorkerPid, maps:next(I))
|
||||||
|
end.
|
||||||
|
|
||||||
|
state_to_invitation_status(State) ->
|
||||||
|
History = maps:get(history, State, #{}),
|
||||||
|
{Succ, Failed} = lists:foldl(
|
||||||
|
fun({Node, Task}, {SuccAcc, FailedAcc}) ->
|
||||||
|
#{
|
||||||
|
started_at := StartedAt,
|
||||||
|
finished_at := FinishedAt,
|
||||||
|
result := Result
|
||||||
|
} = Task,
|
||||||
|
Ret = #{node => Node, started_at => StartedAt, finished_at => FinishedAt},
|
||||||
|
case is_succeed_result(Result) of
|
||||||
|
true ->
|
||||||
|
{[Ret | SuccAcc], FailedAcc};
|
||||||
|
false ->
|
||||||
|
{SuccAcc, [Ret#{reason => format_error_reason(Result)} | FailedAcc]}
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
{[], []},
|
||||||
|
maps:to_list(History)
|
||||||
|
),
|
||||||
|
|
||||||
|
InPro = maps:fold(
|
||||||
|
fun(Node, _Task = #{started_at := StartedAt}, Acc) ->
|
||||||
|
[#{node => Node, started_at => StartedAt} | Acc]
|
||||||
|
end,
|
||||||
|
[],
|
||||||
|
maps:without([history], State)
|
||||||
|
),
|
||||||
|
#{succeed => Succ, in_progress => InPro, failed => Failed}.
|
||||||
|
|
||||||
|
is_succeed_result(Result) ->
|
||||||
|
case Result of
|
||||||
|
ok ->
|
||||||
|
true;
|
||||||
|
{error, {already_in_cluster, _Node}} ->
|
||||||
|
true;
|
||||||
|
_ ->
|
||||||
|
false
|
||||||
|
end.
|
||||||
|
|
||||||
|
format_error_reason(Term) ->
|
||||||
|
iolist_to_binary(io_lib:format("~0p", [Term])).
|
|
@ -33,7 +33,8 @@ init([]) ->
|
||||||
_ ->
|
_ ->
|
||||||
[]
|
[]
|
||||||
end,
|
end,
|
||||||
{ok, {{one_for_one, 1, 5}, Workers}}.
|
Cluster = child_spec(emqx_mgmt_cluster, 5000, worker),
|
||||||
|
{ok, {{one_for_one, 1, 5}, [Cluster | Workers]}}.
|
||||||
|
|
||||||
child_spec(Mod, Shutdown, Type) ->
|
child_spec(Mod, Shutdown, Type) ->
|
||||||
#{
|
#{
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_mgmt_cluster_proto_v3).
|
||||||
|
|
||||||
|
-behaviour(emqx_bpapi).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
introduced_in/0,
|
||||||
|
invite_node/3,
|
||||||
|
connected_replicants/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/bpapi.hrl").
|
||||||
|
|
||||||
|
introduced_in() ->
|
||||||
|
"5.5.0".
|
||||||
|
|
||||||
|
-spec invite_node(node(), node(), timeout()) -> ok | ignore | {error, term()} | emqx_rpc:badrpc().
|
||||||
|
invite_node(Node, Self, Timeout) when is_integer(Timeout); Timeout =:= infinity ->
|
||||||
|
rpc:call(Node, emqx_mgmt_api_cluster, join, [Self], Timeout).
|
||||||
|
|
||||||
|
-spec connected_replicants([node()]) -> emqx_rpc:multicall_result().
|
||||||
|
connected_replicants(Nodes) ->
|
||||||
|
rpc:multicall(Nodes, emqx_mgmt_api_cluster, connected_replicants, [], 30_000).
|
|
@ -1,5 +1,5 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%
|
%%
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
%% you may not use this file except in compliance with the License.
|
%% you may not use this file except in compliance with the License.
|
||||||
|
@ -35,6 +35,12 @@ end_per_suite(_) ->
|
||||||
init_per_testcase(TC = t_cluster_topology_api_replicants, Config0) ->
|
init_per_testcase(TC = t_cluster_topology_api_replicants, Config0) ->
|
||||||
Config = [{tc_name, TC} | Config0],
|
Config = [{tc_name, TC} | Config0],
|
||||||
[{cluster, cluster(Config)} | setup(Config)];
|
[{cluster, cluster(Config)} | setup(Config)];
|
||||||
|
init_per_testcase(TC = t_cluster_invite_api_timeout, Config0) ->
|
||||||
|
Config = [{tc_name, TC} | Config0],
|
||||||
|
[{cluster, cluster(Config)} | setup(Config)];
|
||||||
|
init_per_testcase(TC = t_cluster_invite_async, Config0) ->
|
||||||
|
Config = [{tc_name, TC} | Config0],
|
||||||
|
[{cluster, cluster(Config)} | setup(Config)];
|
||||||
init_per_testcase(_TC, Config) ->
|
init_per_testcase(_TC, Config) ->
|
||||||
emqx_mgmt_api_test_util:init_suite(?APPS),
|
emqx_mgmt_api_test_util:init_suite(?APPS),
|
||||||
Config.
|
Config.
|
||||||
|
@ -42,6 +48,12 @@ init_per_testcase(_TC, Config) ->
|
||||||
end_per_testcase(t_cluster_topology_api_replicants, Config) ->
|
end_per_testcase(t_cluster_topology_api_replicants, Config) ->
|
||||||
emqx_cth_cluster:stop(?config(cluster, Config)),
|
emqx_cth_cluster:stop(?config(cluster, Config)),
|
||||||
cleanup(Config);
|
cleanup(Config);
|
||||||
|
end_per_testcase(t_cluster_invite_api_timeout, Config) ->
|
||||||
|
emqx_cth_cluster:stop(?config(cluster, Config)),
|
||||||
|
cleanup(Config);
|
||||||
|
end_per_testcase(t_cluster_invite_async, Config) ->
|
||||||
|
emqx_cth_cluster:stop(?config(cluster, Config)),
|
||||||
|
cleanup(Config);
|
||||||
end_per_testcase(_TC, _Config) ->
|
end_per_testcase(_TC, _Config) ->
|
||||||
emqx_mgmt_api_test_util:end_suite(?APPS).
|
emqx_mgmt_api_test_util:end_suite(?APPS).
|
||||||
|
|
||||||
|
@ -77,12 +89,186 @@ t_cluster_topology_api_replicants(Config) ->
|
||||||
|| Resp <- [lists:sort(R) || R <- [Core1Resp, Core2Resp, ReplResp]]
|
|| Resp <- [lists:sort(R) || R <- [Core1Resp, Core2Resp, ReplResp]]
|
||||||
].
|
].
|
||||||
|
|
||||||
|
t_cluster_invite_api_timeout(Config) ->
|
||||||
|
%% assert the cluster is created
|
||||||
|
[Core1, Core2, Replicant] = _NodesList = ?config(cluster, Config),
|
||||||
|
{200, Core1Resp} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
core_node := Core1,
|
||||||
|
replicant_nodes :=
|
||||||
|
[#{node := Replicant, streams := _}]
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
core_node := Core2,
|
||||||
|
replicant_nodes :=
|
||||||
|
[#{node := Replicant, streams := _}]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
lists:sort(Core1Resp)
|
||||||
|
),
|
||||||
|
|
||||||
|
%% force leave the core2
|
||||||
|
{204} = rpc:call(
|
||||||
|
Core1,
|
||||||
|
emqx_mgmt_api_cluster,
|
||||||
|
force_leave,
|
||||||
|
[delete, #{bindings => #{node => atom_to_binary(Core2)}}]
|
||||||
|
),
|
||||||
|
|
||||||
|
%% assert the cluster is updated
|
||||||
|
{200, Core1Resp2} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
core_node := Core1,
|
||||||
|
replicant_nodes :=
|
||||||
|
[#{node := Replicant, streams := _}]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
lists:sort(Core1Resp2)
|
||||||
|
),
|
||||||
|
|
||||||
|
%% assert timeout parameter checking
|
||||||
|
Invite = fun(Node, Timeout) ->
|
||||||
|
Node1 = atom_to_binary(Node),
|
||||||
|
rpc:call(
|
||||||
|
Core1,
|
||||||
|
emqx_mgmt_api_cluster,
|
||||||
|
invite_node,
|
||||||
|
[put, #{bindings => #{node => Node1}, body => #{<<"timeout">> => Timeout}}]
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
?assertMatch(
|
||||||
|
{400, #{code := 'BAD_REQUEST', message := <<"timeout must be an integer">>}},
|
||||||
|
Invite(Core2, not_a_integer_timeout)
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{400, #{code := 'BAD_REQUEST', message := <<"timeout cannot be less than 5000ms">>}},
|
||||||
|
Invite(Core2, 3000)
|
||||||
|
),
|
||||||
|
|
||||||
|
%% assert cluster is updated after invite
|
||||||
|
?assertMatch(
|
||||||
|
{200},
|
||||||
|
Invite(Core2, 15000)
|
||||||
|
),
|
||||||
|
{200, Core1Resp3} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
core_node := Core1,
|
||||||
|
replicant_nodes :=
|
||||||
|
[#{node := Replicant, streams := _}]
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
core_node := Core2,
|
||||||
|
replicant_nodes := _
|
||||||
|
}
|
||||||
|
],
|
||||||
|
lists:sort(Core1Resp3)
|
||||||
|
).
|
||||||
|
|
||||||
|
t_cluster_invite_async(Config) ->
|
||||||
|
%% assert the cluster is created
|
||||||
|
[Core1, Core2, Replicant] = _NodesList = ?config(cluster, Config),
|
||||||
|
{200, Core1Resp} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
core_node := Core1,
|
||||||
|
replicant_nodes :=
|
||||||
|
[#{node := Replicant, streams := _}]
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
core_node := Core2,
|
||||||
|
replicant_nodes :=
|
||||||
|
[#{node := Replicant, streams := _}]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
lists:sort(Core1Resp)
|
||||||
|
),
|
||||||
|
|
||||||
|
%% force leave the core2 and replicant
|
||||||
|
{204} = rpc:call(
|
||||||
|
Core1,
|
||||||
|
emqx_mgmt_api_cluster,
|
||||||
|
force_leave,
|
||||||
|
[delete, #{bindings => #{node => atom_to_binary(Core2)}}]
|
||||||
|
),
|
||||||
|
%% assert the cluster is updated
|
||||||
|
{200, Core1Resp2} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
core_node := Core1,
|
||||||
|
replicant_nodes := [_]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
lists:sort(Core1Resp2)
|
||||||
|
),
|
||||||
|
|
||||||
|
Invite = fun(Node) ->
|
||||||
|
Node1 = atom_to_binary(Node),
|
||||||
|
rpc:call(
|
||||||
|
Core1,
|
||||||
|
emqx_mgmt_api_cluster,
|
||||||
|
invite_node_async,
|
||||||
|
[put, #{bindings => #{node => Node1}}]
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
|
||||||
|
%% parameter checking
|
||||||
|
?assertMatch(
|
||||||
|
{400, #{code := 'BAD_REQUEST', message := <<"Can't invite self">>}},
|
||||||
|
Invite(Core1)
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{200},
|
||||||
|
Invite(Core2)
|
||||||
|
),
|
||||||
|
%% already invited
|
||||||
|
?assertMatch(
|
||||||
|
{400, #{
|
||||||
|
code := 'BAD_REQUEST',
|
||||||
|
message := <<"The invitation task already created for this node">>
|
||||||
|
}},
|
||||||
|
Invite(Core2)
|
||||||
|
),
|
||||||
|
|
||||||
|
%% assert: core2 is in_progress status
|
||||||
|
?assertMatch(
|
||||||
|
{200, #{in_progress := [#{node := Core2}]}},
|
||||||
|
rpc:call(Core1, emqx_mgmt_api_cluster, get_invitation_status, [get, #{}])
|
||||||
|
),
|
||||||
|
|
||||||
|
%% waiting the async invitation_succeed
|
||||||
|
?assertMatch({succeed, _}, waiting_the_async_invitation_succeed(Core1, Core2)),
|
||||||
|
|
||||||
|
{200, Core1Resp3} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
core_node := Core1,
|
||||||
|
replicant_nodes :=
|
||||||
|
[#{node := Replicant, streams := _}]
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
core_node := Core2,
|
||||||
|
replicant_nodes := _
|
||||||
|
}
|
||||||
|
],
|
||||||
|
lists:sort(Core1Resp3)
|
||||||
|
).
|
||||||
|
|
||||||
cluster(Config) ->
|
cluster(Config) ->
|
||||||
|
NodeSpec = #{apps => ?APPS},
|
||||||
Nodes = emqx_cth_cluster:start(
|
Nodes = emqx_cth_cluster:start(
|
||||||
[
|
[
|
||||||
{data_backup_core1, #{role => core, apps => ?APPS}},
|
{data_backup_core1, NodeSpec#{role => core}},
|
||||||
{data_backup_core2, #{role => core, apps => ?APPS}},
|
{data_backup_core2, NodeSpec#{role => core}},
|
||||||
{data_backup_replicant, #{role => replicant, apps => ?APPS}}
|
{data_backup_replicant, NodeSpec#{role => replicant}}
|
||||||
],
|
],
|
||||||
#{work_dir => work_dir(Config)}
|
#{work_dir => work_dir(Config)}
|
||||||
),
|
),
|
||||||
|
@ -98,3 +284,37 @@ cleanup(Config) ->
|
||||||
|
|
||||||
work_dir(Config) ->
|
work_dir(Config) ->
|
||||||
filename:join(?config(priv_dir, Config), ?config(tc_name, Config)).
|
filename:join(?config(priv_dir, Config), ?config(tc_name, Config)).
|
||||||
|
|
||||||
|
waiting_the_async_invitation_succeed(Node, TargetNode) ->
|
||||||
|
waiting_the_async_invitation_succeed(Node, TargetNode, 100).
|
||||||
|
|
||||||
|
waiting_the_async_invitation_succeed(_Node, _TargetNode, 0) ->
|
||||||
|
error(timeout);
|
||||||
|
waiting_the_async_invitation_succeed(Node, TargetNode, N) ->
|
||||||
|
{200, #{
|
||||||
|
in_progress := InProgress,
|
||||||
|
succeed := Succeed,
|
||||||
|
failed := Failed
|
||||||
|
}} = rpc:call(Node, emqx_mgmt_api_cluster, get_invitation_status, [get, #{}]),
|
||||||
|
case find_node_info_list(TargetNode, InProgress) of
|
||||||
|
error ->
|
||||||
|
case find_node_info_list(TargetNode, Succeed) of
|
||||||
|
error ->
|
||||||
|
case find_node_info_list(TargetNode, Failed) of
|
||||||
|
error -> error;
|
||||||
|
Info1 -> {failed, Info1}
|
||||||
|
end;
|
||||||
|
Info2 ->
|
||||||
|
{succeed, Info2}
|
||||||
|
end;
|
||||||
|
_Info ->
|
||||||
|
timer:sleep(1000),
|
||||||
|
waiting_the_async_invitation_succeed(Node, TargetNode, N - 1)
|
||||||
|
end.
|
||||||
|
|
||||||
|
find_node_info_list(Node, List) ->
|
||||||
|
L = lists:filter(fun(#{node := N}) -> N =:= Node end, List),
|
||||||
|
case L of
|
||||||
|
[] -> error;
|
||||||
|
[Info] -> Info
|
||||||
|
end.
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
Add a new `timeout` parameter to the `cluster/:node/invite` interface.
|
||||||
|
Previously the default timeout was 5s which would often be caused by HTTP API calls due to emqx taking too long to join cluster.
|
||||||
|
|
||||||
|
Add a new endpoint `/cluster/:node/invite_async` to support an asynchronous way to invite nodes to join the cluster,
|
||||||
|
and a new endpoint `cluster/invitation` to inspect the join status.
|
|
@ -15,9 +15,19 @@ invite_node.desc:
|
||||||
invite_node.label:
|
invite_node.label:
|
||||||
"""Invite node to cluster"""
|
"""Invite node to cluster"""
|
||||||
|
|
||||||
|
invite_node_async.desc:
|
||||||
|
"""Send a join invitation to a node to join the cluster but do not wait for the join result. Join status can be retrieved with `GET api/<version>/invitation`"""
|
||||||
|
invite_node_async.label:
|
||||||
|
"""Asynchronously invite"""
|
||||||
|
|
||||||
force_remove_node.desc:
|
force_remove_node.desc:
|
||||||
"""Force leave node from cluster"""
|
"""Force leave node from cluster"""
|
||||||
force_remove_node.label:
|
force_remove_node.label:
|
||||||
"""Force leave node from cluster"""
|
"""Force leave node from cluster"""
|
||||||
|
|
||||||
|
get_invitation_status.desc:
|
||||||
|
"""Get the execution status of all asynchronous invite status per node"""
|
||||||
|
get_invitation_status.label:
|
||||||
|
"""Get invitation statuses"""
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue