From cad6492c990c4f694878fda8a1b64ed3bc776741 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 13 Mar 2023 14:51:28 +0300 Subject: [PATCH] perf(bridge-api): ask bridge listings in parallel Also rename response formatting functions to better clarify their purpose. --- apps/emqx/priv/bpapi.versions | 1 + apps/emqx_bridge/src/emqx_bridge_api.erl | 34 +++-- .../src/proto/emqx_bridge_proto_v2.erl | 4 + .../src/proto/emqx_bridge_proto_v3.erl | 128 ++++++++++++++++++ apps/emqx_resource/src/emqx_resource.erl | 2 + 5 files changed, 154 insertions(+), 15 deletions(-) create mode 100644 apps/emqx_bridge/src/proto/emqx_bridge_proto_v3.erl diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 769145722..904611199 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -4,6 +4,7 @@ {emqx_authz,1}. {emqx_bridge,1}. {emqx_bridge,2}. +{emqx_bridge,3}. {emqx_broker,1}. {emqx_cm,1}. {emqx_conf,1}. diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index ff55976d0..e9f31d010 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -483,11 +483,18 @@ schema("/bridges_probe") -> end end; '/bridges'(get, _Params) -> - {200, - zip_bridges([ - [format_resp(Data, Node) || Data <- emqx_bridge_proto_v1:list_bridges(Node)] - || Node <- mria:running_nodes() - ])}. + Nodes = mria:running_nodes(), + NodeReplies = emqx_bridge_proto_v3:list_bridges_on_nodes(Nodes), + case is_ok(NodeReplies) of + {ok, NodeBridges} -> + AllBridges = [ + format_resource(Data, Node) + || {Node, Bridges} <- lists:zip(Nodes, NodeBridges), Data <- Bridges + ], + {200, zip_bridges([AllBridges])}; + {error, Reason} -> + {500, error_msg('INTERNAL_ERROR', Reason)} + end. '/bridges/:id'(get, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200)); @@ -591,7 +598,7 @@ lookup_from_all_nodes_metrics(BridgeType, BridgeName, SuccCode) -> do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) -> Nodes = mria:running_nodes(), - case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of + case is_ok(emqx_bridge_proto_v3:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of {ok, [{ok, _} | _] = Results} -> {SuccCode, FormatFun([R || {ok, R} <- Results])}; {ok, [{error, not_found} | _]} -> @@ -602,7 +609,7 @@ do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) -> lookup_from_local_node(BridgeType, BridgeName) -> case emqx_bridge:lookup(BridgeType, BridgeName) of - {ok, Res} -> {ok, format_resp(Res)}; + {ok, Res} -> {ok, format_resource(Res, node())}; Error -> Error end. @@ -802,10 +809,7 @@ aggregate_metrics( aggregate_metrics(#{}, Metrics) -> Metrics. -format_resp(Data) -> - format_resp(Data, node()). - -format_resp( +format_resource( #{ type := Type, name := BridgeName, @@ -974,7 +978,7 @@ do_bpapi_call(Node, Call, Args) -> do_bpapi_call_vsn(SupportedVersion, Call, Args) -> case lists:member(SupportedVersion, supported_versions(Call)) of true -> - apply(emqx_bridge_proto_v2, Call, Args); + apply(emqx_bridge_proto_v3, Call, Args); false -> {error, not_implemented} end. @@ -984,9 +988,9 @@ maybe_unwrap({error, not_implemented}) -> maybe_unwrap(RpcMulticallResult) -> emqx_rpc:unwrap_erpc(RpcMulticallResult). -supported_versions(start_bridge_to_node) -> [2]; -supported_versions(start_bridges_to_all_nodes) -> [2]; -supported_versions(_Call) -> [1, 2]. +supported_versions(start_bridge_to_node) -> [2, 3]; +supported_versions(start_bridges_to_all_nodes) -> [2, 3]; +supported_versions(_Call) -> [1, 2, 3]. to_hr_reason(nxdomain) -> <<"Host not found">>; diff --git a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl index 0fd733380..bcf6ca198 100644 --- a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl +++ b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl @@ -20,6 +20,7 @@ -export([ introduced_in/0, + deprecated_since/0, list_bridges/1, restart_bridge_to_node/3, @@ -38,6 +39,9 @@ introduced_in() -> "5.0.17". +deprecated_since() -> + "5.0.21". + -spec list_bridges(node()) -> list() | emqx_rpc:badrpc(). list_bridges(Node) -> rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT). diff --git a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v3.erl b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v3.erl new file mode 100644 index 000000000..a35db5d96 --- /dev/null +++ b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v3.erl @@ -0,0 +1,128 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_proto_v3). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + list_bridges/1, + list_bridges_on_nodes/1, + restart_bridge_to_node/3, + start_bridge_to_node/3, + stop_bridge_to_node/3, + lookup_from_all_nodes/3, + restart_bridges_to_all_nodes/3, + start_bridges_to_all_nodes/3, + stop_bridges_to_all_nodes/3 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +-define(TIMEOUT, 15000). + +introduced_in() -> + "5.0.21". + +-spec list_bridges(node()) -> list() | emqx_rpc:badrpc(). +list_bridges(Node) -> + rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT). + +-spec list_bridges_on_nodes([node()]) -> + emqx_rpc:erpc_multicall([emqx_resource:resource_data()]). +list_bridges_on_nodes(Nodes) -> + erpc:multicall(Nodes, emqx_bridge, list, [], ?TIMEOUT). + +-type key() :: atom() | binary() | [byte()]. + +-spec restart_bridge_to_node(node(), key(), key()) -> + term(). +restart_bridge_to_node(Node, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_resource, + restart, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec start_bridge_to_node(node(), key(), key()) -> + term(). +start_bridge_to_node(Node, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_resource, + start, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec stop_bridge_to_node(node(), key(), key()) -> + term(). +stop_bridge_to_node(Node, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_resource, + stop, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec restart_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_resource, + restart, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec start_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_resource, + start, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec stop_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_resource, + stop, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec lookup_from_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +lookup_from_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_api, + lookup_from_local_node, + [BridgeType, BridgeName], + ?TIMEOUT + ). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 2c6865e04..57d56b339 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -112,6 +112,8 @@ -export([apply_reply_fun/2]). +-export_type([resource_data/0]). + -optional_callbacks([ on_query/3, on_batch_query/3,