perf(bridge-api): ask bridge listings in parallel

Also rename response formatting functions to better clarify their
purpose.
This commit is contained in:
Andrew Mayorov 2023-03-13 14:51:28 +03:00
parent 53bc27e0f4
commit cad6492c99
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
5 changed files with 154 additions and 15 deletions

View File

@ -4,6 +4,7 @@
{emqx_authz,1}.
{emqx_bridge,1}.
{emqx_bridge,2}.
{emqx_bridge,3}.
{emqx_broker,1}.
{emqx_cm,1}.
{emqx_conf,1}.

View File

@ -483,11 +483,18 @@ schema("/bridges_probe") ->
end
end;
'/bridges'(get, _Params) ->
{200,
zip_bridges([
[format_resp(Data, Node) || Data <- emqx_bridge_proto_v1:list_bridges(Node)]
|| Node <- mria:running_nodes()
])}.
Nodes = mria:running_nodes(),
NodeReplies = emqx_bridge_proto_v3:list_bridges_on_nodes(Nodes),
case is_ok(NodeReplies) of
{ok, NodeBridges} ->
AllBridges = [
format_resource(Data, Node)
|| {Node, Bridges} <- lists:zip(Nodes, NodeBridges), Data <- Bridges
],
{200, zip_bridges([AllBridges])};
{error, Reason} ->
{500, error_msg('INTERNAL_ERROR', Reason)}
end.
'/bridges/:id'(get, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200));
@ -591,7 +598,7 @@ lookup_from_all_nodes_metrics(BridgeType, BridgeName, SuccCode) ->
do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) ->
Nodes = mria:running_nodes(),
case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
case is_ok(emqx_bridge_proto_v3:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
{ok, [{ok, _} | _] = Results} ->
{SuccCode, FormatFun([R || {ok, R} <- Results])};
{ok, [{error, not_found} | _]} ->
@ -602,7 +609,7 @@ do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) ->
lookup_from_local_node(BridgeType, BridgeName) ->
case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, Res} -> {ok, format_resp(Res)};
{ok, Res} -> {ok, format_resource(Res, node())};
Error -> Error
end.
@ -802,10 +809,7 @@ aggregate_metrics(
aggregate_metrics(#{}, Metrics) ->
Metrics.
format_resp(Data) ->
format_resp(Data, node()).
format_resp(
format_resource(
#{
type := Type,
name := BridgeName,
@ -974,7 +978,7 @@ do_bpapi_call(Node, Call, Args) ->
do_bpapi_call_vsn(SupportedVersion, Call, Args) ->
case lists:member(SupportedVersion, supported_versions(Call)) of
true ->
apply(emqx_bridge_proto_v2, Call, Args);
apply(emqx_bridge_proto_v3, Call, Args);
false ->
{error, not_implemented}
end.
@ -984,9 +988,9 @@ maybe_unwrap({error, not_implemented}) ->
maybe_unwrap(RpcMulticallResult) ->
emqx_rpc:unwrap_erpc(RpcMulticallResult).
supported_versions(start_bridge_to_node) -> [2];
supported_versions(start_bridges_to_all_nodes) -> [2];
supported_versions(_Call) -> [1, 2].
supported_versions(start_bridge_to_node) -> [2, 3];
supported_versions(start_bridges_to_all_nodes) -> [2, 3];
supported_versions(_Call) -> [1, 2, 3].
to_hr_reason(nxdomain) ->
<<"Host not found">>;

View File

@ -20,6 +20,7 @@
-export([
introduced_in/0,
deprecated_since/0,
list_bridges/1,
restart_bridge_to_node/3,
@ -38,6 +39,9 @@
introduced_in() ->
"5.0.17".
deprecated_since() ->
"5.0.21".
-spec list_bridges(node()) -> list() | emqx_rpc:badrpc().
list_bridges(Node) ->
rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT).

View File

@ -0,0 +1,128 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_proto_v3).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
list_bridges/1,
list_bridges_on_nodes/1,
restart_bridge_to_node/3,
start_bridge_to_node/3,
stop_bridge_to_node/3,
lookup_from_all_nodes/3,
restart_bridges_to_all_nodes/3,
start_bridges_to_all_nodes/3,
stop_bridges_to_all_nodes/3
]).
-include_lib("emqx/include/bpapi.hrl").
-define(TIMEOUT, 15000).
introduced_in() ->
"5.0.21".
-spec list_bridges(node()) -> list() | emqx_rpc:badrpc().
list_bridges(Node) ->
rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT).
-spec list_bridges_on_nodes([node()]) ->
emqx_rpc:erpc_multicall([emqx_resource:resource_data()]).
list_bridges_on_nodes(Nodes) ->
erpc:multicall(Nodes, emqx_bridge, list, [], ?TIMEOUT).
-type key() :: atom() | binary() | [byte()].
-spec restart_bridge_to_node(node(), key(), key()) ->
term().
restart_bridge_to_node(Node, BridgeType, BridgeName) ->
rpc:call(
Node,
emqx_bridge_resource,
restart,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec start_bridge_to_node(node(), key(), key()) ->
term().
start_bridge_to_node(Node, BridgeType, BridgeName) ->
rpc:call(
Node,
emqx_bridge_resource,
start,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec stop_bridge_to_node(node(), key(), key()) ->
term().
stop_bridge_to_node(Node, BridgeType, BridgeName) ->
rpc:call(
Node,
emqx_bridge_resource,
stop,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_resource,
restart,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec start_bridges_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_resource,
start,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_resource,
stop,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec lookup_from_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_api,
lookup_from_local_node,
[BridgeType, BridgeName],
?TIMEOUT
).

View File

@ -112,6 +112,8 @@
-export([apply_reply_fun/2]).
-export_type([resource_data/0]).
-optional_callbacks([
on_query/3,
on_batch_query/3,