%%-------------------------------------------------------------------- %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_mgmt_cluster). -behaviour(gen_server). %% APIs -export([start_link/0]). -export([invite_async/1, invitation_status/0]). %% gen_server callbacks -export([ init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3 ]). %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -spec invite_async(atom()) -> ok | ignore | {error, {already_started, pid()}}. invite_async(Node) -> %% Proxy the invitation task to the leader node JoinTo = mria_membership:leader(), case Node =/= JoinTo of true -> gen_server:call({?MODULE, JoinTo}, {invite_async, Node, JoinTo}, infinity); false -> ignore end. -spec invitation_status() -> map(). invitation_status() -> Leader = mria_membership:leader(), gen_server:call({?MODULE, Leader}, invitation_status, infinity). %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> process_flag(trap_exit, true), {ok, #{}}. handle_call({invite_async, Node, JoinTo}, _From, State) -> case maps:get(Node, State, undefined) of undefined -> Caller = self(), Task = spawn_link_invite_worker(Node, JoinTo, Caller), State1 = remove_finished_task(Node, State), {reply, ok, State1#{Node => Task}}; WorkerPid -> {reply, {error, {already_started, WorkerPid}}, State} end; handle_call(invitation_status, _From, State) -> {reply, state_to_invitation_status(State), State}; handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. handle_cast(_Msg, State) -> {noreply, State}. handle_info({task_done, _WorkerPid, Node, Result}, State) -> case maps:take(Node, State) of {Task, State1} -> History = maps:get(history, State1, #{}), Task1 = Task#{ result => Result, finished_at => erlang:system_time(millisecond) }, {noreply, State1#{history => History#{Node => Task1}}}; error -> {noreply, State} end; handle_info({'EXIT', WorkerPid, Reason}, State) -> case take_node_name_via_worker_pid(WorkerPid, State) of {key_value, Node, Task, State1} -> History = maps:get(history, State1, #{}), Task1 = Task#{ result => {error, Reason}, finished_at => erlang:system_time(millisecond) }, {noreply, State1#{history => History#{Node => Task1}}}; error -> {noreply, State} end; handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- spawn_link_invite_worker(Node, JoinTo, Caller) -> Pid = erlang:spawn_link( fun() -> Result = case emqx_mgmt_cluster_proto_v3:invite_node(Node, JoinTo, infinity) of ok -> ok; {error, {already_in_cluster, _Node}} -> ok; {error, _} = E -> E; {badrpc, Reason} -> {error, {badrpc, Reason}} end, Caller ! {task_done, self(), Node, Result} end ), #{worker => Pid, started_at => erlang:system_time(millisecond)}. take_node_name_via_worker_pid(WorkerPid, Map) when is_map(Map) -> Key = find_node_name_via_worker_pid(WorkerPid, maps:next(maps:iterator(Map))), case maps:take(Key, Map) of error -> error; {Vaule, Map1} -> {key_value, Key, Vaule, Map1} end. find_node_name_via_worker_pid(_WorkerPid, none) -> error; find_node_name_via_worker_pid(WorkerPid, {Key, Task, I}) -> case maps:get(worker, Task, undefined) of WorkerPid -> Key; _ -> find_node_name_via_worker_pid(WorkerPid, maps:next(I)) end. remove_finished_task(Node, State = #{history := History}) -> State#{history => maps:remove(Node, History)}; remove_finished_task(_Node, State) -> State. state_to_invitation_status(State) -> History = maps:get(history, State, #{}), {Succ, Failed} = lists:foldl( fun({Node, Task}, {SuccAcc, FailedAcc}) -> #{ started_at := StartedAt, finished_at := FinishedAt, result := Result } = Task, Ret = #{node => Node, started_at => StartedAt, finished_at => FinishedAt}, case is_succeed_result(Result) of true -> {[Ret | SuccAcc], FailedAcc}; false -> {SuccAcc, [Ret#{reason => format_error_reason(Result)} | FailedAcc]} end end, {[], []}, maps:to_list(History) ), InPro = maps:fold( fun(Node, _Task = #{started_at := StartedAt}, Acc) -> [#{node => Node, started_at => StartedAt} | Acc] end, [], maps:without([history], State) ), #{succeed => Succ, in_progress => InPro, failed => Failed}. is_succeed_result(Result) -> case Result of ok -> true; {error, {already_in_cluster, _Node}} -> true; _ -> false end. format_error_reason(Term) -> iolist_to_binary(io_lib:format("~0p", [Term])).