diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 4c2fdd3eb..2d1ba49eb 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -32,6 +32,7 @@ {emqx_mgmt_api_plugins,1}. {emqx_mgmt_api_plugins,2}. {emqx_mgmt_cluster,1}. +{emqx_mgmt_cluster,2}. {emqx_mgmt_trace,1}. {emqx_mgmt_trace,2}. {emqx_node_rebalance,1}. diff --git a/apps/emqx_management/src/emqx_mgmt_api_cluster.erl b/apps/emqx_management/src/emqx_mgmt_api_cluster.erl index ba185e369..be1b8e354 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_cluster.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_cluster.erl @@ -19,9 +19,17 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx/include/logger.hrl"). -export([api_spec/0, fields/1, paths/0, schema/1, namespace/0]). --export([cluster_info/2, invite_node/2, force_leave/2, join/1]). +-export([ + cluster_info/2, + cluster_topology/2, + invite_node/2, + force_leave/2, + join/1, + connected_replicants/0 +]). namespace() -> "cluster". @@ -31,6 +39,7 @@ api_spec() -> paths() -> [ "/cluster", + "/cluster/topology", "/cluster/:node/invite", "/cluster/:node/force_leave" ]. @@ -50,6 +59,17 @@ schema("/cluster") -> } } }; +schema("/cluster/topology") -> + #{ + 'operationId' => cluster_topology, + get => #{ + desc => ?DESC(get_cluster_topology), + tags => [<<"Cluster">>], + responses => #{ + 200 => ?HOCON(?ARRAY(?REF(core_replicants)), #{desc => <<"Cluster topology">>}) + } + } + }; schema("/cluster/:node/invite") -> #{ 'operationId' => invite_node, @@ -89,6 +109,28 @@ fields(node) -> validator => fun validate_node/1 } )} + ]; +fields(replicant_info) -> + [ + {node, + ?HOCON( + atom(), + #{desc => <<"Replicant node name">>, example => <<"emqx-replicant@127.0.0.2">>} + )}, + {streams, + ?HOCON( + non_neg_integer(), + #{desc => <<"The number of RLOG (replicated log) streams">>, example => <<"10">>} + )} + ]; +fields(core_replicants) -> + [ + {core_node, + ?HOCON( + atom(), + #{desc => <<"Core node name">>, example => <<"emqx-core@127.0.0.1">>} + )}, + {replicant_nodes, ?HOCON(?ARRAY(?REF(replicant_info)))} ]. validate_node(Node) -> @@ -106,6 +148,46 @@ cluster_info(get, _) -> }, {200, Info}. +cluster_topology(get, _) -> + RunningCores = running_cores(), + {Replicants, BadNodes} = emqx_mgmt_cluster_proto_v2:connected_replicants(RunningCores), + CoreReplicants = lists:zip( + lists:filter( + fun(N) -> not lists:member(N, BadNodes) end, + RunningCores + ), + Replicants + ), + Topology = lists:map( + fun + ({Core, {badrpc, Reason}}) -> + ?SLOG(error, #{ + msg => "failed_to_get_replicant_nodes", + core_node => Core, + reason => Reason + }), + #{core_node => Core, replicant_nodes => []}; + ({Core, Repls}) -> + #{core_node => Core, replicant_nodes => format_replicants(Repls)} + end, + CoreReplicants + ), + BadNodes =/= [] andalso ?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes}), + {200, Topology}. + +format_replicants(Replicants) -> + maps:fold( + fun(K, V, Acc) -> + [#{node => K, streams => length(V)} | Acc] + end, + [], + maps:groups_from_list(fun({_, N, _}) -> N end, Replicants) + ). + +running_cores() -> + Running = emqx:running_nodes(), + lists:filter(fun(C) -> lists:member(C, Running) end, emqx:cluster_nodes(cores)). + invite_node(put, #{bindings := #{node := Node0}}) -> Node = ekka_node:parse_name(binary_to_list(Node0)), case emqx_mgmt_cluster_proto_v1:invite_node(Node, node()) of @@ -134,5 +216,9 @@ force_leave(delete, #{bindings := #{node := Node0}}) -> join(Node) -> ekka:join(Node). +-spec connected_replicants() -> [{atom(), node(), pid()}]. +connected_replicants() -> + mria_status:agents(). + error_message(Msg) -> iolist_to_binary(io_lib:format("~p", [Msg])). diff --git a/apps/emqx_management/src/proto/emqx_mgmt_cluster_proto_v2.erl b/apps/emqx_management/src/proto/emqx_mgmt_cluster_proto_v2.erl new file mode 100644 index 000000000..18b06321f --- /dev/null +++ b/apps/emqx_management/src/proto/emqx_mgmt_cluster_proto_v2.erl @@ -0,0 +1,38 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mgmt_cluster_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + invite_node/2, + connected_replicants/1 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.1.1". + +-spec invite_node(node(), node()) -> ok | ignore | {error, term()} | emqx_rpc:badrpc(). +invite_node(Node, Self) -> + rpc:call(Node, emqx_mgmt_api_cluster, join, [Self], 5000). + +-spec connected_replicants([node()]) -> emqx_rpc:multicall_result(). +connected_replicants(Nodes) -> + rpc:multicall(Nodes, emqx_mgmt_api_cluster, connected_replicants, [], 30_000). diff --git a/apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl new file mode 100644 index 000000000..abe201217 --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl @@ -0,0 +1,102 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_api_cluster_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(APPS, [emqx_conf, emqx_management]). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Config. + +end_per_suite(_) -> + ok. + +init_per_testcase(TC = t_cluster_topology_api_replicants, Config0) -> + Config = [{tc_name, TC} | Config0], + [{cluster, cluster(Config)} | setup(Config)]; +init_per_testcase(_TC, Config) -> + emqx_mgmt_api_test_util:init_suite(?APPS), + Config. + +end_per_testcase(t_cluster_topology_api_replicants, Config) -> + emqx_cth_cluster:stop(?config(cluster, Config)), + cleanup(Config); +end_per_testcase(_TC, _Config) -> + emqx_mgmt_api_test_util:end_suite(?APPS). + +t_cluster_topology_api_empty_resp(_) -> + ClusterTopologyPath = emqx_mgmt_api_test_util:api_path(["cluster", "topology"]), + {ok, Resp} = emqx_mgmt_api_test_util:request_api(get, ClusterTopologyPath), + ?assertEqual( + [#{<<"core_node">> => atom_to_binary(node()), <<"replicant_nodes">> => []}], + emqx_utils_json:decode(Resp, [return_maps]) + ). + +t_cluster_topology_api_replicants(Config) -> + %% some time to stabilize + timer:sleep(3000), + [Core1, Core2, Replicant] = _NodesList = ?config(cluster, Config), + {200, Core1Resp} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]), + {200, Core2Resp} = rpc:call(Core2, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]), + {200, ReplResp} = rpc:call(Replicant, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]), + [ + ?assertMatch( + [ + #{ + core_node := Core1, + replicant_nodes := + [#{node := Replicant, streams := _}] + }, + #{ + core_node := Core2, + replicant_nodes := + [#{node := Replicant, streams := _}] + } + ], + Resp + ) + || Resp <- [lists:sort(R) || R <- [Core1Resp, Core2Resp, ReplResp]] + ]. + +cluster(Config) -> + Nodes = emqx_cth_cluster:start( + [ + {data_backup_core1, #{role => core, apps => ?APPS}}, + {data_backup_core2, #{role => core, apps => ?APPS}}, + {data_backup_replicant, #{role => replicant, apps => ?APPS}} + ], + #{work_dir => work_dir(Config)} + ), + Nodes. + +setup(Config) -> + WorkDir = filename:join(work_dir(Config), local), + Started = emqx_cth_suite:start(?APPS, #{work_dir => WorkDir}), + [{suite_apps, Started} | Config]. + +cleanup(Config) -> + emqx_cth_suite:stop(?config(suite_apps, Config)). + +work_dir(Config) -> + filename:join(?config(priv_dir, Config), ?config(tc_name, Config)). diff --git a/changes/ce/feat-11251.en.md b/changes/ce/feat-11251.en.md new file mode 100644 index 000000000..85e7cf3c4 --- /dev/null +++ b/changes/ce/feat-11251.en.md @@ -0,0 +1,3 @@ +Add `/cluster/topology` HTTP API endpoint + +`GET` request to the endpoint returns the cluster topology: connections between RLOG core and replicant nodes. diff --git a/rel/i18n/emqx_mgmt_api_cluster.hocon b/rel/i18n/emqx_mgmt_api_cluster.hocon index f8b6de1a4..0e6fceba1 100644 --- a/rel/i18n/emqx_mgmt_api_cluster.hocon +++ b/rel/i18n/emqx_mgmt_api_cluster.hocon @@ -5,6 +5,11 @@ get_cluster_info.desc: get_cluster_info.label: """Get cluster info""" +get_cluster_topology.desc: +"""Get RLOG cluster topology: connections between core and replicant nodes.""" +get_cluster_topology.label: +"""Get cluster topology""" + invite_node.desc: """Invite node to cluster""" invite_node.label: