chore: forward the async invite to leader node

This commit is contained in:
JianBo He 2024-01-13 15:06:38 +08:00
parent 93ef6766ef
commit 4c40e754f4
5 changed files with 32 additions and 22 deletions

View File

@ -41,6 +41,7 @@
{emqx_mgmt_api_plugins,2}. {emqx_mgmt_api_plugins,2}.
{emqx_mgmt_cluster,1}. {emqx_mgmt_cluster,1}.
{emqx_mgmt_cluster,2}. {emqx_mgmt_cluster,2}.
{emqx_mgmt_cluster,3}.
{emqx_mgmt_data_backup,1}. {emqx_mgmt_data_backup,1}.
{emqx_mgmt_trace,1}. {emqx_mgmt_trace,1}.
{emqx_mgmt_trace,2}. {emqx_mgmt_trace,2}.

View File

@ -78,13 +78,13 @@ schema("/cluster/topology") ->
}; };
schema("/cluster/invitation") -> schema("/cluster/invitation") ->
#{ #{
'operationId' => get_invitation_view, 'operationId' => get_invitation_status,
get => #{ get => #{
desc => ?DESC(get_invitation_view), desc => ?DESC(get_invitation_status),
tags => [<<"Cluster">>], tags => [<<"Cluster">>],
responses => #{ responses => #{
200 => ?HOCON( 200 => ?HOCON(
?REF(invitation_view), ?REF(invitation_status),
#{desc => <<"Get invitation progress created by async operation">>} #{desc => <<"Get invitation progress created by async operation">>}
) )
} }
@ -174,7 +174,7 @@ fields(timeout) ->
#{desc => <<"Timeout in milliseconds">>, example => <<"15000">>} #{desc => <<"Timeout in milliseconds">>, example => <<"15000">>}
)} )}
]; ];
fields(invitation_view) -> fields(invitation_status) ->
[ [
{succeed, {succeed,
?HOCON( ?HOCON(
@ -208,7 +208,8 @@ fields(node_invitation_succeed) ->
?HOCON( ?HOCON(
emqx_utils_calendar:epoch_millisecond(), emqx_utils_calendar:epoch_millisecond(),
#{ #{
desc => <<"The time of the async invitation result is received, millisecond precision epoch">>, desc =>
<<"The time of the async invitation result is received, millisecond precision epoch">>,
example => <<"1705044829915">> example => <<"1705044829915">>
} }
)} )}
@ -224,7 +225,8 @@ fields(node_invitation_in_progress) ->
?HOCON( ?HOCON(
emqx_utils_calendar:epoch_millisecond(), emqx_utils_calendar:epoch_millisecond(),
#{ #{
desc => <<"The start timestamp of the invitation, millisecond precision epoch">>, desc =>
<<"The start timestamp of the invitation, millisecond precision epoch">>,
example => <<"1705044829915">> example => <<"1705044829915">>
} }
)} )}
@ -319,8 +321,8 @@ invite_node_async(put, #{bindings := #{node := Node0}}) ->
}} }}
end. end.
get_invitation_view(get, _) -> get_invitation_status(get, _) ->
{200, format_invitation_view(emqx_mgmt_cluster:invitation_view())}. {200, format_invitation_status(emqx_mgmt_cluster:invitation_status())}.
force_leave(delete, #{bindings := #{node := Node0}}) -> force_leave(delete, #{bindings := #{node := Node0}}) ->
Node = ekka_node:parse_name(binary_to_list(Node0)), Node = ekka_node:parse_name(binary_to_list(Node0)),
@ -344,7 +346,7 @@ connected_replicants() ->
error_message(Msg) -> error_message(Msg) ->
iolist_to_binary(io_lib:format("~p", [Msg])). iolist_to_binary(io_lib:format("~p", [Msg])).
format_invitation_view(#{ format_invitation_status(#{
succeed := Succeed, succeed := Succeed,
in_progress := InProgress, in_progress := InProgress,
failed := Failed failed := Failed

View File

@ -21,7 +21,7 @@
%% APIs %% APIs
-export([start_link/0]). -export([start_link/0]).
-export([invite_async/1, invitation_view/0]). -export([invite_async/1, invitation_status/0]).
%% gen_server callbacks %% gen_server callbacks
-export([ -export([
@ -42,17 +42,19 @@ start_link() ->
-spec invite_async(atom()) -> ok | ignore | {error, {already_started, pid()}}. -spec invite_async(atom()) -> ok | ignore | {error, {already_started, pid()}}.
invite_async(Node) -> invite_async(Node) ->
JoinTo = node(), %% Proxy the invitation task to the leader node
JoinTo = mria_membership:leader(),
case Node =/= JoinTo of case Node =/= JoinTo of
true -> true ->
gen_server:call(?MODULE, {invite_async, Node, JoinTo}, infinity); gen_server:call({?MODULE, JoinTo}, {invite_async, Node, JoinTo}, infinity);
false -> false ->
ignore ignore
end. end.
-spec invitation_view() -> map(). -spec invitation_status() -> map().
invitation_view() -> invitation_status() ->
gen_server:call(?MODULE, invitation_view, infinity). Leader = mria_membership:leader(),
gen_server:call({?MODULE, Leader}, invitation_status, infinity).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
@ -71,8 +73,8 @@ handle_call({invite_async, Node, JoinTo}, _From, State) ->
WorkerPid -> WorkerPid ->
{reply, {error, {already_started, WorkerPid}}, State} {reply, {error, {already_started, WorkerPid}}, State}
end; end;
handle_call(invitation_view, _From, State) -> handle_call(invitation_status, _From, State) ->
{reply, state_to_invitation_view(State), State}; {reply, state_to_invitation_status(State), State};
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
Reply = ok, Reply = ok,
{reply, Reply, State}. {reply, Reply, State}.
@ -155,7 +157,7 @@ find_node_name_via_worker_pid(WorkerPid, {Key, Task, I}) ->
find_node_name_via_worker_pid(WorkerPid, maps:next(I)) find_node_name_via_worker_pid(WorkerPid, maps:next(I))
end. end.
state_to_invitation_view(State) -> state_to_invitation_status(State) ->
History = maps:get(history, State, #{}), History = maps:get(history, State, #{}),
{Succ, Failed} = lists:foldl( {Succ, Failed} = lists:foldl(
fun({Node, Task}, {SuccAcc, FailedAcc}) -> fun({Node, Task}, {SuccAcc, FailedAcc}) ->

View File

@ -141,11 +141,11 @@ t_cluster_invite_api_timeout(Config) ->
) )
end, end,
?assertMatch( ?assertMatch(
{400, #{code := 'BAD_REQUEST', message := <<"timeout must be integer">>}}, {400, #{code := 'BAD_REQUEST', message := <<"timeout must be an integer">>}},
Invite(Core2, not_a_integer_timeout) Invite(Core2, not_a_integer_timeout)
), ),
?assertMatch( ?assertMatch(
{400, #{code := 'BAD_REQUEST', message := <<"timeout can't less than 5000ms">>}}, {400, #{code := 'BAD_REQUEST', message := <<"timeout cannot be less than 5000ms">>}},
Invite(Core2, 3000) Invite(Core2, 3000)
), ),
@ -240,7 +240,7 @@ t_cluster_invite_async(Config) ->
%% assert: core2 is in_progress status %% assert: core2 is in_progress status
?assertMatch( ?assertMatch(
{200, #{in_progress := [#{node := Core2}]}}, {200, #{in_progress := [#{node := Core2}]}},
rpc:call(Core1, emqx_mgmt_api_cluster, get_invitation_view, [get, #{}]) rpc:call(Core1, emqx_mgmt_api_cluster, get_invitation_status, [get, #{}])
), ),
%% waiting the async invitation_succeed %% waiting the async invitation_succeed
@ -295,7 +295,7 @@ waiting_the_async_invitation_succeed(Node, TargetNode, N) ->
in_progress := InProgress, in_progress := InProgress,
succeed := Succeed, succeed := Succeed,
failed := Failed failed := Failed
}} = rpc:call(Node, emqx_mgmt_api_cluster, get_invitation_view, [get, #{}]), }} = rpc:call(Node, emqx_mgmt_api_cluster, get_invitation_status, [get, #{}]),
case find_node_info_list(TargetNode, InProgress) of case find_node_info_list(TargetNode, InProgress) of
error -> error ->
case find_node_info_list(TargetNode, Succeed) of case find_node_info_list(TargetNode, Succeed) of

View File

@ -25,4 +25,9 @@ force_remove_node.desc:
force_remove_node.label: force_remove_node.label:
"""Force leave node from cluster""" """Force leave node from cluster"""
get_invitation_status.desc:
"""Get the execution status of all asynchronous invite node tasks"""
get_invitation_status.label:
"""Get status of all invitation tasks"""
} }