From 6ff4c560e430934acb74870214ef8959cf7dcd1e Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 12 Jan 2024 13:26:39 +0800 Subject: [PATCH] feat: support invite node in async mananer --- .../src/emqx_mgmt_api_cluster.erl | 127 ++++++++++++ .../emqx_management/src/emqx_mgmt_cluster.erl | 196 ++++++++++++++++++ apps/emqx_management/src/emqx_mgmt_sup.erl | 3 +- .../src/proto/emqx_mgmt_cluster_proto_v3.erl | 2 +- .../test/emqx_mgmt_api_cluster_SUITE.erl | 134 +++++++++++- rel/i18n/emqx_mgmt_api_cluster.hocon | 5 + 6 files changed, 464 insertions(+), 3 deletions(-) create mode 100644 apps/emqx_management/src/emqx_mgmt_cluster.erl diff --git a/apps/emqx_management/src/emqx_mgmt_api_cluster.erl b/apps/emqx_management/src/emqx_mgmt_api_cluster.erl index 0a9a17ea2..1a46c0b36 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_cluster.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_cluster.erl @@ -26,6 +26,8 @@ cluster_info/2, cluster_topology/2, invite_node/2, + invite_node_async/2, + get_invitation_view/2, force_leave/2, join/1, connected_replicants/0 @@ -42,7 +44,9 @@ paths() -> [ "/cluster", "/cluster/topology", + "/cluster/invitation", "/cluster/:node/invite", + "/cluster/:node/invite_async", "/cluster/:node/force_leave" ]. @@ -72,6 +76,20 @@ schema("/cluster/topology") -> } } }; +schema("/cluster/invitation") -> + #{ + 'operationId' => get_invitation_view, + get => #{ + desc => ?DESC(get_invitation_view), + tags => [<<"Cluster">>], + responses => #{ + 200 => ?HOCON( + ?REF(invitation_view), + #{desc => <<"Get invitation progress created by async operation">>} + ) + } + } + }; schema("/cluster/:node/invite") -> #{ 'operationId' => invite_node, @@ -86,6 +104,19 @@ schema("/cluster/:node/invite") -> } } }; +schema("/cluster/:node/invite_async") -> + #{ + 'operationId' => invite_node_async, + put => #{ + desc => ?DESC(invite_node_async), + tags => [<<"Cluster">>], + parameters => [hoconsc:ref(node)], + responses => #{ + 200 => <<"ok">>, + 400 => emqx_dashboard_swagger:error_codes(['BAD_REQUEST']) + } + } + }; schema("/cluster/:node/force_leave") -> #{ 'operationId' => force_leave, @@ -142,6 +173,61 @@ fields(timeout) -> non_neg_integer(), #{desc => <<"Timeout in milliseconds">>, example => <<"15000">>} )} + ]; +fields(invitation_view) -> + [ + {succeed, + ?HOCON( + ?ARRAY(?REF(node_invitation_succeed)), + #{desc => <<"A list of information about nodes that were successfully invited">>} + )}, + {in_progress, + ?HOCON( + ?ARRAY(?REF(node_invitation_in_progress)), + #{desc => <<"A list of information about nodes that are processing invitations">>} + )}, + {failed, + ?HOCON( + ?ARRAY(?REF(node_invitation_failed)), + #{desc => <<"A list of information about nodes that failed to be invited">>} + )} + ]; +fields(node_invitation_failed) -> + fields(node_invitation_succeed) ++ + [ + {reason, + ?HOCON( + binary(), + #{desc => <<"Failed reason">>, example => <<"Bad RPC to target node">>} + )} + ]; +fields(node_invitation_succeed) -> + fields(node_invitation_in_progress) ++ + [ + {finished_at, + ?HOCON( + emqx_utils_calendar:epoch_millisecond(), + #{ + desc => <<"The end time of the invitation task, in millisecond">>, + example => <<"1705044829915">> + } + )} + ]; +fields(node_invitation_in_progress) -> + [ + {node, + ?HOCON( + binary(), + #{desc => <<"Node name">>, example => <<"emqx2@127.0.0.1">>} + )}, + {started_at, + ?HOCON( + emqx_utils_calendar:epoch_millisecond(), + #{ + desc => <<"The start time of the invitation task, in millisecond">>, + example => <<"1705044829915">> + } + )} ]. validate_node(Node) -> @@ -219,6 +305,23 @@ invite_node(put, #{bindings := #{node := Node0}, body := Body}) -> end end. +invite_node_async(put, #{bindings := #{node := Node0}}) -> + Node = ekka_node:parse_name(binary_to_list(Node0)), + case emqx_mgmt_cluster:invite_async(Node) of + ok -> + {200}; + ignore -> + {400, #{code => 'BAD_REQUEST', message => <<"Can't invite self">>}}; + {error, {already_started, _Pid}} -> + {400, #{ + code => 'BAD_REQUEST', + message => <<"The invitation task already created for this node">> + }} + end. + +get_invitation_view(get, _) -> + {200, format_invitation_view(emqx_mgmt_cluster:invitation_view())}. + force_leave(delete, #{bindings := #{node := Node0}}) -> Node = ekka_node:parse_name(binary_to_list(Node0)), case ekka:force_leave(Node) of @@ -240,3 +343,27 @@ connected_replicants() -> error_message(Msg) -> iolist_to_binary(io_lib:format("~p", [Msg])). + +format_invitation_view(#{ + succeed := Succeed, + in_progress := InProgress, + failed := Failed +}) -> + #{ + succeed => format_invitation_info(Succeed), + in_progress => format_invitation_info(InProgress), + failed => format_invitation_info(Failed) + }. + +format_invitation_info(L) when is_list(L) -> + lists:map( + fun(Info) -> + Info1 = emqx_utils_maps:update_if_present( + started_at, fun emqx_utils_calendar:epoch_to_rfc3339/1, Info + ), + emqx_utils_maps:update_if_present( + finished_at, fun emqx_utils_calendar:epoch_to_rfc3339/1, Info1 + ) + end, + L + ). diff --git a/apps/emqx_management/src/emqx_mgmt_cluster.erl b/apps/emqx_management/src/emqx_mgmt_cluster.erl new file mode 100644 index 000000000..b5dfaae93 --- /dev/null +++ b/apps/emqx_management/src/emqx_mgmt_cluster.erl @@ -0,0 +1,196 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mgmt_cluster). + +-behaviour(gen_server). + +%% APIs +-export([start_link/0]). + +-export([invite_async/1, invitation_view/0]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec invite_async(atom()) -> ok | ignore | {badrpc, any()}. +invite_async(Node) -> + JoinTo = node(), + case Node =/= JoinTo of + true -> + gen_server:call(?MODULE, {invite_async, Node, JoinTo}); + false -> + ignore + end. + +-spec invitation_view() -> map(). +invitation_view() -> + gen_server:call(?MODULE, invitation_view). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([]) -> + process_flag(trap_exit, true), + {ok, #{}}. + +handle_call({invite_async, Node, JoinTo}, _From, State) -> + case maps:get(Node, State, undefined) of + undefined -> + Caller = self(), + Task = spawn_link_invite_worker(Node, JoinTo, Caller), + {reply, ok, State#{Node => Task}}; + WorkerPid -> + {reply, {error, {already_started, WorkerPid}}, State} + end; +handle_call(invitation_view, _From, State) -> + {reply, state_to_invitation_view(State), State}; +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({task_done, _WorkerPid, Node, Result}, State) -> + case maps:take(Node, State) of + {Task, State1} -> + History = maps:get(history, State1, #{}), + Task1 = Task#{ + result => Result, + finished_at => erlang:system_time(millisecond) + }, + {noreply, State1#{history => History#{Node => Task1}}}; + error -> + {noreply, State} + end; +handle_info({'EXIT', WorkerPid, Reason}, State) -> + case take_node_name_via_worker_pid(WorkerPid, State) of + {key_value, Node, Task, State1} -> + History = maps:get(history, State1, #{}), + Task1 = Task#{ + result => {error, Reason}, + finished_at => erlang:system_time(millisecond) + }, + {noreply, State1#{history => History#{Node => Task1}}}; + error -> + {noreply, State} + end; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- + +spawn_link_invite_worker(Node, JoinTo, Caller) -> + Pid = erlang:spawn_link( + fun() -> + Result = + case emqx_mgmt_cluster_proto_v3:invite_node(Node, JoinTo, infinity) of + ok -> + ok; + {error, {already_in_cluster, _Node}} -> + ok; + {error, _} = E -> + E; + {badrpc, Reason} -> + {error, {badrpc, Reason}} + end, + Caller ! {task_done, self(), Node, Result} + end + ), + #{worker => Pid, started_at => erlang:system_time(millisecond)}. + +take_node_name_via_worker_pid(WorkerPid, Map) when is_map(Map) -> + Key = find_node_name_via_worker_pid(WorkerPid, maps:next(maps:iterator(Map))), + case maps:take(Key, Map) of + error -> + error; + {Vaule, Map1} -> + {key_value, Key, Vaule, Map1} + end. + +find_node_name_via_worker_pid(_WorkerPid, none) -> + error; +find_node_name_via_worker_pid(WorkerPid, {Key, Task, I}) -> + case maps:get(worker, Task, undefined) of + WorkerPid -> + Key; + _ -> + find_node_name_via_worker_pid(WorkerPid, maps:next(I)) + end. + +state_to_invitation_view(State) -> + History = maps:get(history, State, #{}), + {Succ, Failed} = lists:foldl( + fun({Node, Task}, {SuccAcc, FailedAcc}) -> + #{ + started_at := StartedAt, + finished_at := FinishedAt, + result := Result + } = Task, + Ret = #{node => Node, started_at => StartedAt, finished_at => FinishedAt}, + case is_succeed_result(Result) of + true -> + {[Ret | SuccAcc], FailedAcc}; + false -> + {SuccAcc, [Ret#{reason => Result} | FailedAcc]} + end + end, + {[], []}, + maps:to_list(History) + ), + + InPro = maps:fold( + fun(Node, _Task = #{started_at := StartedAt}, Acc) -> + [#{node => Node, started_at => StartedAt} | Acc] + end, + [], + maps:without([history], State) + ), + #{succeed => Succ, in_progress => InPro, failed => Failed}. + +is_succeed_result(Result) -> + case Result of + ok -> + true; + {error, {already_in_cluster, _Node}} -> + true; + _ -> + false + end. diff --git a/apps/emqx_management/src/emqx_mgmt_sup.erl b/apps/emqx_management/src/emqx_mgmt_sup.erl index 713ff87dc..5bd8632c3 100644 --- a/apps/emqx_management/src/emqx_mgmt_sup.erl +++ b/apps/emqx_management/src/emqx_mgmt_sup.erl @@ -33,7 +33,8 @@ init([]) -> _ -> [] end, - {ok, {{one_for_one, 1, 5}, Workers}}. + Cluster = child_spec(emqx_mgmt_cluster, 5000, worker), + {ok, {{one_for_one, 1, 5}, [Cluster | Workers]}}. child_spec(Mod, Shutdown, Type) -> #{ diff --git a/apps/emqx_management/src/proto/emqx_mgmt_cluster_proto_v3.erl b/apps/emqx_management/src/proto/emqx_mgmt_cluster_proto_v3.erl index 8110ac2cb..b00d63e1d 100644 --- a/apps/emqx_management/src/proto/emqx_mgmt_cluster_proto_v3.erl +++ b/apps/emqx_management/src/proto/emqx_mgmt_cluster_proto_v3.erl @@ -30,7 +30,7 @@ introduced_in() -> "5.5.0". -spec invite_node(node(), node(), timeout()) -> ok | ignore | {error, term()} | emqx_rpc:badrpc(). -invite_node(Node, Self, Timeout) when is_integer(Timeout) -> +invite_node(Node, Self, Timeout) when is_integer(Timeout); Timeout =:= infinity -> rpc:call(Node, emqx_mgmt_api_cluster, join, [Self], Timeout). -spec connected_replicants([node()]) -> emqx_rpc:multicall_result(). diff --git a/apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl index 3d4124b28..d195b04e1 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -38,6 +38,9 @@ init_per_testcase(TC = t_cluster_topology_api_replicants, Config0) -> init_per_testcase(TC = t_cluster_invite_api_timeout, Config0) -> Config = [{tc_name, TC} | Config0], [{cluster, cluster(Config)} | setup(Config)]; +init_per_testcase(TC = t_cluster_invite_async, Config0) -> + Config = [{tc_name, TC} | Config0], + [{cluster, cluster(Config)} | setup(Config)]; init_per_testcase(_TC, Config) -> emqx_mgmt_api_test_util:init_suite(?APPS), Config. @@ -48,6 +51,9 @@ end_per_testcase(t_cluster_topology_api_replicants, Config) -> end_per_testcase(t_cluster_invite_api_timeout, Config) -> emqx_cth_cluster:stop(?config(cluster, Config)), cleanup(Config); +end_per_testcase(t_cluster_invite_async, Config) -> + emqx_cth_cluster:stop(?config(cluster, Config)), + cleanup(Config); end_per_testcase(_TC, _Config) -> emqx_mgmt_api_test_util:end_suite(?APPS). @@ -164,6 +170,98 @@ t_cluster_invite_api_timeout(Config) -> lists:sort(Core1Resp3) ). +t_cluster_invite_async(Config) -> + %% assert the cluster is created + [Core1, Core2, Replicant] = _NodesList = ?config(cluster, Config), + {200, Core1Resp} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]), + ?assertMatch( + [ + #{ + core_node := Core1, + replicant_nodes := + [#{node := Replicant, streams := _}] + }, + #{ + core_node := Core2, + replicant_nodes := + [#{node := Replicant, streams := _}] + } + ], + lists:sort(Core1Resp) + ), + + %% force leave the core2 and replicant + {204} = rpc:call( + Core1, + emqx_mgmt_api_cluster, + force_leave, + [delete, #{bindings => #{node => atom_to_binary(Core2)}}] + ), + %% assert the cluster is updated + {200, Core1Resp2} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]), + ?assertMatch( + [ + #{ + core_node := Core1, + replicant_nodes := [_] + } + ], + lists:sort(Core1Resp2) + ), + + Invite = fun(Node) -> + Node1 = atom_to_binary(Node), + rpc:call( + Core1, + emqx_mgmt_api_cluster, + invite_node_async, + [put, #{bindings => #{node => Node1}}] + ) + end, + + %% parameter checking + ?assertMatch( + {400, #{code := 'BAD_REQUEST', message := <<"Can't invite self">>}}, + Invite(Core1) + ), + ?assertMatch( + {200}, + Invite(Core2) + ), + %% already invited + ?assertMatch( + {400, #{ + code := 'BAD_REQUEST', + message := <<"The invitation task already created for this node">> + }}, + Invite(Core2) + ), + + %% assert: core2 is in_progress status + ?assertMatch( + {200, #{in_progress := [#{node := Core2}]}}, + rpc:call(Core1, emqx_mgmt_api_cluster, get_invitation_view, [get, #{}]) + ), + + %% waiting the async invitation_succeed + ?assertMatch({succeed, _}, waiting_the_async_invitation_succeed(Core1, Core2)), + + {200, Core1Resp3} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]), + ?assertMatch( + [ + #{ + core_node := Core1, + replicant_nodes := + [#{node := Replicant, streams := _}] + }, + #{ + core_node := Core2, + replicant_nodes := _ + } + ], + lists:sort(Core1Resp3) + ). + cluster(Config) -> NodeSpec = #{apps => ?APPS}, Nodes = emqx_cth_cluster:start( @@ -186,3 +284,37 @@ cleanup(Config) -> work_dir(Config) -> filename:join(?config(priv_dir, Config), ?config(tc_name, Config)). + +waiting_the_async_invitation_succeed(Node, TargetNode) -> + waiting_the_async_invitation_succeed(Node, TargetNode, 100). + +waiting_the_async_invitation_succeed(_Node, _TargetNode, 0) -> + error(timeout); +waiting_the_async_invitation_succeed(Node, TargetNode, N) -> + {200, #{ + in_progress := InProgress, + succeed := Succeed, + failed := Failed + }} = rpc:call(Node, emqx_mgmt_api_cluster, get_invitation_view, [get, #{}]), + case find_node_info_list(TargetNode, InProgress) of + error -> + case find_node_info_list(TargetNode, Succeed) of + error -> + case find_node_info_list(TargetNode, Failed) of + error -> error; + Info1 -> {failed, Info1} + end; + Info2 -> + {succeed, Info2} + end; + _Info -> + timer:sleep(1000), + waiting_the_async_invitation_succeed(Node, TargetNode, N - 1) + end. + +find_node_info_list(Node, List) -> + L = lists:filter(fun(#{node := N}) -> N =:= Node end, List), + case L of + [] -> error; + [Info] -> Info + end. diff --git a/rel/i18n/emqx_mgmt_api_cluster.hocon b/rel/i18n/emqx_mgmt_api_cluster.hocon index 0e6fceba1..9b7d782fe 100644 --- a/rel/i18n/emqx_mgmt_api_cluster.hocon +++ b/rel/i18n/emqx_mgmt_api_cluster.hocon @@ -15,6 +15,11 @@ invite_node.desc: invite_node.label: """Invite node to cluster""" +invite_node_async.desc: +"""Asynchronously invite the target node to join the cluster""" +invite_node_async.label: +"""Asynchronously invite the target node to join the cluster""" + force_remove_node.desc: """Force leave node from cluster""" force_remove_node.label: