perf(bridge-api): ask bridge listings in parallel

Also rename response formatting functions to better clarify their
purpose.
This commit is contained in:
Andrew Mayorov 2023-03-13 14:51:28 +03:00
parent 53bc27e0f4
commit cad6492c99
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
5 changed files with 154 additions and 15 deletions

View File

@ -4,6 +4,7 @@
{emqx_authz,1}. {emqx_authz,1}.
{emqx_bridge,1}. {emqx_bridge,1}.
{emqx_bridge,2}. {emqx_bridge,2}.
{emqx_bridge,3}.
{emqx_broker,1}. {emqx_broker,1}.
{emqx_cm,1}. {emqx_cm,1}.
{emqx_conf,1}. {emqx_conf,1}.

View File

@ -483,11 +483,18 @@ schema("/bridges_probe") ->
end end
end; end;
'/bridges'(get, _Params) -> '/bridges'(get, _Params) ->
{200, Nodes = mria:running_nodes(),
zip_bridges([ NodeReplies = emqx_bridge_proto_v3:list_bridges_on_nodes(Nodes),
[format_resp(Data, Node) || Data <- emqx_bridge_proto_v1:list_bridges(Node)] case is_ok(NodeReplies) of
|| Node <- mria:running_nodes() {ok, NodeBridges} ->
])}. AllBridges = [
format_resource(Data, Node)
|| {Node, Bridges} <- lists:zip(Nodes, NodeBridges), Data <- Bridges
],
{200, zip_bridges([AllBridges])};
{error, Reason} ->
{500, error_msg('INTERNAL_ERROR', Reason)}
end.
'/bridges/:id'(get, #{bindings := #{id := Id}}) -> '/bridges/:id'(get, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200)); ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200));
@ -591,7 +598,7 @@ lookup_from_all_nodes_metrics(BridgeType, BridgeName, SuccCode) ->
do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) -> do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) ->
Nodes = mria:running_nodes(), Nodes = mria:running_nodes(),
case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of case is_ok(emqx_bridge_proto_v3:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
{ok, [{ok, _} | _] = Results} -> {ok, [{ok, _} | _] = Results} ->
{SuccCode, FormatFun([R || {ok, R} <- Results])}; {SuccCode, FormatFun([R || {ok, R} <- Results])};
{ok, [{error, not_found} | _]} -> {ok, [{error, not_found} | _]} ->
@ -602,7 +609,7 @@ do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) ->
lookup_from_local_node(BridgeType, BridgeName) -> lookup_from_local_node(BridgeType, BridgeName) ->
case emqx_bridge:lookup(BridgeType, BridgeName) of case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, Res} -> {ok, format_resp(Res)}; {ok, Res} -> {ok, format_resource(Res, node())};
Error -> Error Error -> Error
end. end.
@ -802,10 +809,7 @@ aggregate_metrics(
aggregate_metrics(#{}, Metrics) -> aggregate_metrics(#{}, Metrics) ->
Metrics. Metrics.
format_resp(Data) -> format_resource(
format_resp(Data, node()).
format_resp(
#{ #{
type := Type, type := Type,
name := BridgeName, name := BridgeName,
@ -974,7 +978,7 @@ do_bpapi_call(Node, Call, Args) ->
do_bpapi_call_vsn(SupportedVersion, Call, Args) -> do_bpapi_call_vsn(SupportedVersion, Call, Args) ->
case lists:member(SupportedVersion, supported_versions(Call)) of case lists:member(SupportedVersion, supported_versions(Call)) of
true -> true ->
apply(emqx_bridge_proto_v2, Call, Args); apply(emqx_bridge_proto_v3, Call, Args);
false -> false ->
{error, not_implemented} {error, not_implemented}
end. end.
@ -984,9 +988,9 @@ maybe_unwrap({error, not_implemented}) ->
maybe_unwrap(RpcMulticallResult) -> maybe_unwrap(RpcMulticallResult) ->
emqx_rpc:unwrap_erpc(RpcMulticallResult). emqx_rpc:unwrap_erpc(RpcMulticallResult).
supported_versions(start_bridge_to_node) -> [2]; supported_versions(start_bridge_to_node) -> [2, 3];
supported_versions(start_bridges_to_all_nodes) -> [2]; supported_versions(start_bridges_to_all_nodes) -> [2, 3];
supported_versions(_Call) -> [1, 2]. supported_versions(_Call) -> [1, 2, 3].
to_hr_reason(nxdomain) -> to_hr_reason(nxdomain) ->
<<"Host not found">>; <<"Host not found">>;

View File

@ -20,6 +20,7 @@
-export([ -export([
introduced_in/0, introduced_in/0,
deprecated_since/0,
list_bridges/1, list_bridges/1,
restart_bridge_to_node/3, restart_bridge_to_node/3,
@ -38,6 +39,9 @@
introduced_in() -> introduced_in() ->
"5.0.17". "5.0.17".
deprecated_since() ->
"5.0.21".
-spec list_bridges(node()) -> list() | emqx_rpc:badrpc(). -spec list_bridges(node()) -> list() | emqx_rpc:badrpc().
list_bridges(Node) -> list_bridges(Node) ->
rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT). rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT).

View File

@ -0,0 +1,128 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_proto_v3).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
list_bridges/1,
list_bridges_on_nodes/1,
restart_bridge_to_node/3,
start_bridge_to_node/3,
stop_bridge_to_node/3,
lookup_from_all_nodes/3,
restart_bridges_to_all_nodes/3,
start_bridges_to_all_nodes/3,
stop_bridges_to_all_nodes/3
]).
-include_lib("emqx/include/bpapi.hrl").
-define(TIMEOUT, 15000).
introduced_in() ->
"5.0.21".
-spec list_bridges(node()) -> list() | emqx_rpc:badrpc().
list_bridges(Node) ->
rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT).
-spec list_bridges_on_nodes([node()]) ->
emqx_rpc:erpc_multicall([emqx_resource:resource_data()]).
list_bridges_on_nodes(Nodes) ->
erpc:multicall(Nodes, emqx_bridge, list, [], ?TIMEOUT).
-type key() :: atom() | binary() | [byte()].
-spec restart_bridge_to_node(node(), key(), key()) ->
term().
restart_bridge_to_node(Node, BridgeType, BridgeName) ->
rpc:call(
Node,
emqx_bridge_resource,
restart,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec start_bridge_to_node(node(), key(), key()) ->
term().
start_bridge_to_node(Node, BridgeType, BridgeName) ->
rpc:call(
Node,
emqx_bridge_resource,
start,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec stop_bridge_to_node(node(), key(), key()) ->
term().
stop_bridge_to_node(Node, BridgeType, BridgeName) ->
rpc:call(
Node,
emqx_bridge_resource,
stop,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_resource,
restart,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec start_bridges_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_resource,
start,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_resource,
stop,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec lookup_from_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_api,
lookup_from_local_node,
[BridgeType, BridgeName],
?TIMEOUT
).

View File

@ -112,6 +112,8 @@
-export([apply_reply_fun/2]). -export([apply_reply_fun/2]).
-export_type([resource_data/0]).
-optional_callbacks([ -optional_callbacks([
on_query/3, on_query/3,
on_batch_query/3, on_batch_query/3,