208 lines
6.5 KiB
Erlang
208 lines
6.5 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_mgmt_cluster).
|
|
|
|
-behaviour(gen_server).
|
|
|
|
%% APIs
|
|
-export([start_link/0]).
|
|
|
|
-export([invite_async/1, invitation_status/0]).
|
|
|
|
%% gen_server callbacks
|
|
-export([
|
|
init/1,
|
|
handle_call/3,
|
|
handle_cast/2,
|
|
handle_info/2,
|
|
terminate/2,
|
|
code_change/3
|
|
]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% APIs
|
|
%%--------------------------------------------------------------------
|
|
|
|
start_link() ->
|
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
|
|
|
-spec invite_async(atom()) -> ok | ignore | {error, {already_started, pid()}}.
|
|
invite_async(Node) ->
|
|
%% Proxy the invitation task to the leader node
|
|
JoinTo = mria_membership:leader(),
|
|
case Node =/= JoinTo of
|
|
true ->
|
|
gen_server:call({?MODULE, JoinTo}, {invite_async, Node, JoinTo}, infinity);
|
|
false ->
|
|
ignore
|
|
end.
|
|
|
|
-spec invitation_status() -> map().
|
|
invitation_status() ->
|
|
Leader = mria_membership:leader(),
|
|
gen_server:call({?MODULE, Leader}, invitation_status, infinity).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% gen_server callbacks
|
|
%%--------------------------------------------------------------------
|
|
|
|
init([]) ->
|
|
process_flag(trap_exit, true),
|
|
{ok, #{}}.
|
|
|
|
handle_call({invite_async, Node, JoinTo}, _From, State) ->
|
|
case maps:get(Node, State, undefined) of
|
|
undefined ->
|
|
Caller = self(),
|
|
Task = spawn_link_invite_worker(Node, JoinTo, Caller),
|
|
State1 = remove_finished_task(Node, State),
|
|
{reply, ok, State1#{Node => Task}};
|
|
WorkerPid ->
|
|
{reply, {error, {already_started, WorkerPid}}, State}
|
|
end;
|
|
handle_call(invitation_status, _From, State) ->
|
|
{reply, state_to_invitation_status(State), State};
|
|
handle_call(_Request, _From, State) ->
|
|
Reply = ok,
|
|
{reply, Reply, State}.
|
|
|
|
handle_cast(_Msg, State) ->
|
|
{noreply, State}.
|
|
|
|
handle_info({task_done, _WorkerPid, Node, Result}, State) ->
|
|
case maps:take(Node, State) of
|
|
{Task, State1} ->
|
|
History = maps:get(history, State1, #{}),
|
|
Task1 = Task#{
|
|
result => Result,
|
|
finished_at => erlang:system_time(millisecond)
|
|
},
|
|
{noreply, State1#{history => History#{Node => Task1}}};
|
|
error ->
|
|
{noreply, State}
|
|
end;
|
|
handle_info({'EXIT', WorkerPid, Reason}, State) ->
|
|
case take_node_name_via_worker_pid(WorkerPid, State) of
|
|
{key_value, Node, Task, State1} ->
|
|
History = maps:get(history, State1, #{}),
|
|
Task1 = Task#{
|
|
result => {error, Reason},
|
|
finished_at => erlang:system_time(millisecond)
|
|
},
|
|
{noreply, State1#{history => History#{Node => Task1}}};
|
|
error ->
|
|
{noreply, State}
|
|
end;
|
|
handle_info(_Info, State) ->
|
|
{noreply, State}.
|
|
|
|
terminate(_Reason, _State) ->
|
|
ok.
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
{ok, State}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal funcs
|
|
%%--------------------------------------------------------------------
|
|
|
|
spawn_link_invite_worker(Node, JoinTo, Caller) ->
|
|
Pid = erlang:spawn_link(
|
|
fun() ->
|
|
Result =
|
|
case emqx_mgmt_cluster_proto_v3:invite_node(Node, JoinTo, infinity) of
|
|
ok ->
|
|
ok;
|
|
{error, {already_in_cluster, _Node}} ->
|
|
ok;
|
|
{error, _} = E ->
|
|
E;
|
|
{badrpc, Reason} ->
|
|
{error, {badrpc, Reason}}
|
|
end,
|
|
Caller ! {task_done, self(), Node, Result}
|
|
end
|
|
),
|
|
#{worker => Pid, started_at => erlang:system_time(millisecond)}.
|
|
|
|
take_node_name_via_worker_pid(WorkerPid, Map) when is_map(Map) ->
|
|
Key = find_node_name_via_worker_pid(WorkerPid, maps:next(maps:iterator(Map))),
|
|
case maps:take(Key, Map) of
|
|
error ->
|
|
error;
|
|
{Vaule, Map1} ->
|
|
{key_value, Key, Vaule, Map1}
|
|
end.
|
|
|
|
find_node_name_via_worker_pid(_WorkerPid, none) ->
|
|
error;
|
|
find_node_name_via_worker_pid(WorkerPid, {Key, Task, I}) ->
|
|
case maps:get(worker, Task, undefined) of
|
|
WorkerPid ->
|
|
Key;
|
|
_ ->
|
|
find_node_name_via_worker_pid(WorkerPid, maps:next(I))
|
|
end.
|
|
|
|
remove_finished_task(Node, State = #{history := History}) ->
|
|
State#{history => maps:remove(Node, History)};
|
|
remove_finished_task(_Node, State) ->
|
|
State.
|
|
|
|
state_to_invitation_status(State) ->
|
|
History = maps:get(history, State, #{}),
|
|
{Succ, Failed} = lists:foldl(
|
|
fun({Node, Task}, {SuccAcc, FailedAcc}) ->
|
|
#{
|
|
started_at := StartedAt,
|
|
finished_at := FinishedAt,
|
|
result := Result
|
|
} = Task,
|
|
Ret = #{node => Node, started_at => StartedAt, finished_at => FinishedAt},
|
|
case is_succeed_result(Result) of
|
|
true ->
|
|
{[Ret | SuccAcc], FailedAcc};
|
|
false ->
|
|
{SuccAcc, [Ret#{reason => format_error_reason(Result)} | FailedAcc]}
|
|
end
|
|
end,
|
|
{[], []},
|
|
maps:to_list(History)
|
|
),
|
|
|
|
InPro = maps:fold(
|
|
fun(Node, _Task = #{started_at := StartedAt}, Acc) ->
|
|
[#{node => Node, started_at => StartedAt} | Acc]
|
|
end,
|
|
[],
|
|
maps:without([history], State)
|
|
),
|
|
#{succeed => Succ, in_progress => InPro, failed => Failed}.
|
|
|
|
is_succeed_result(Result) ->
|
|
case Result of
|
|
ok ->
|
|
true;
|
|
{error, {already_in_cluster, _Node}} ->
|
|
true;
|
|
_ ->
|
|
false
|
|
end.
|
|
|
|
format_error_reason(Term) ->
|
|
iolist_to_binary(io_lib:format("~0p", [Term])).
|