perf(bridge-api): ask bridge listings in parallel
Also rename response formatting functions to better clarify their purpose.
This commit is contained in:
parent
53bc27e0f4
commit
cad6492c99
|
@ -4,6 +4,7 @@
|
||||||
{emqx_authz,1}.
|
{emqx_authz,1}.
|
||||||
{emqx_bridge,1}.
|
{emqx_bridge,1}.
|
||||||
{emqx_bridge,2}.
|
{emqx_bridge,2}.
|
||||||
|
{emqx_bridge,3}.
|
||||||
{emqx_broker,1}.
|
{emqx_broker,1}.
|
||||||
{emqx_cm,1}.
|
{emqx_cm,1}.
|
||||||
{emqx_conf,1}.
|
{emqx_conf,1}.
|
||||||
|
|
|
@ -483,11 +483,18 @@ schema("/bridges_probe") ->
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
'/bridges'(get, _Params) ->
|
'/bridges'(get, _Params) ->
|
||||||
{200,
|
Nodes = mria:running_nodes(),
|
||||||
zip_bridges([
|
NodeReplies = emqx_bridge_proto_v3:list_bridges_on_nodes(Nodes),
|
||||||
[format_resp(Data, Node) || Data <- emqx_bridge_proto_v1:list_bridges(Node)]
|
case is_ok(NodeReplies) of
|
||||||
|| Node <- mria:running_nodes()
|
{ok, NodeBridges} ->
|
||||||
])}.
|
AllBridges = [
|
||||||
|
format_resource(Data, Node)
|
||||||
|
|| {Node, Bridges} <- lists:zip(Nodes, NodeBridges), Data <- Bridges
|
||||||
|
],
|
||||||
|
{200, zip_bridges([AllBridges])};
|
||||||
|
{error, Reason} ->
|
||||||
|
{500, error_msg('INTERNAL_ERROR', Reason)}
|
||||||
|
end.
|
||||||
|
|
||||||
'/bridges/:id'(get, #{bindings := #{id := Id}}) ->
|
'/bridges/:id'(get, #{bindings := #{id := Id}}) ->
|
||||||
?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200));
|
?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200));
|
||||||
|
@ -591,7 +598,7 @@ lookup_from_all_nodes_metrics(BridgeType, BridgeName, SuccCode) ->
|
||||||
|
|
||||||
do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) ->
|
do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) ->
|
||||||
Nodes = mria:running_nodes(),
|
Nodes = mria:running_nodes(),
|
||||||
case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
|
case is_ok(emqx_bridge_proto_v3:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
|
||||||
{ok, [{ok, _} | _] = Results} ->
|
{ok, [{ok, _} | _] = Results} ->
|
||||||
{SuccCode, FormatFun([R || {ok, R} <- Results])};
|
{SuccCode, FormatFun([R || {ok, R} <- Results])};
|
||||||
{ok, [{error, not_found} | _]} ->
|
{ok, [{error, not_found} | _]} ->
|
||||||
|
@ -602,7 +609,7 @@ do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) ->
|
||||||
|
|
||||||
lookup_from_local_node(BridgeType, BridgeName) ->
|
lookup_from_local_node(BridgeType, BridgeName) ->
|
||||||
case emqx_bridge:lookup(BridgeType, BridgeName) of
|
case emqx_bridge:lookup(BridgeType, BridgeName) of
|
||||||
{ok, Res} -> {ok, format_resp(Res)};
|
{ok, Res} -> {ok, format_resource(Res, node())};
|
||||||
Error -> Error
|
Error -> Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -802,10 +809,7 @@ aggregate_metrics(
|
||||||
aggregate_metrics(#{}, Metrics) ->
|
aggregate_metrics(#{}, Metrics) ->
|
||||||
Metrics.
|
Metrics.
|
||||||
|
|
||||||
format_resp(Data) ->
|
format_resource(
|
||||||
format_resp(Data, node()).
|
|
||||||
|
|
||||||
format_resp(
|
|
||||||
#{
|
#{
|
||||||
type := Type,
|
type := Type,
|
||||||
name := BridgeName,
|
name := BridgeName,
|
||||||
|
@ -974,7 +978,7 @@ do_bpapi_call(Node, Call, Args) ->
|
||||||
do_bpapi_call_vsn(SupportedVersion, Call, Args) ->
|
do_bpapi_call_vsn(SupportedVersion, Call, Args) ->
|
||||||
case lists:member(SupportedVersion, supported_versions(Call)) of
|
case lists:member(SupportedVersion, supported_versions(Call)) of
|
||||||
true ->
|
true ->
|
||||||
apply(emqx_bridge_proto_v2, Call, Args);
|
apply(emqx_bridge_proto_v3, Call, Args);
|
||||||
false ->
|
false ->
|
||||||
{error, not_implemented}
|
{error, not_implemented}
|
||||||
end.
|
end.
|
||||||
|
@ -984,9 +988,9 @@ maybe_unwrap({error, not_implemented}) ->
|
||||||
maybe_unwrap(RpcMulticallResult) ->
|
maybe_unwrap(RpcMulticallResult) ->
|
||||||
emqx_rpc:unwrap_erpc(RpcMulticallResult).
|
emqx_rpc:unwrap_erpc(RpcMulticallResult).
|
||||||
|
|
||||||
supported_versions(start_bridge_to_node) -> [2];
|
supported_versions(start_bridge_to_node) -> [2, 3];
|
||||||
supported_versions(start_bridges_to_all_nodes) -> [2];
|
supported_versions(start_bridges_to_all_nodes) -> [2, 3];
|
||||||
supported_versions(_Call) -> [1, 2].
|
supported_versions(_Call) -> [1, 2, 3].
|
||||||
|
|
||||||
to_hr_reason(nxdomain) ->
|
to_hr_reason(nxdomain) ->
|
||||||
<<"Host not found">>;
|
<<"Host not found">>;
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
introduced_in/0,
|
introduced_in/0,
|
||||||
|
deprecated_since/0,
|
||||||
|
|
||||||
list_bridges/1,
|
list_bridges/1,
|
||||||
restart_bridge_to_node/3,
|
restart_bridge_to_node/3,
|
||||||
|
@ -38,6 +39,9 @@
|
||||||
introduced_in() ->
|
introduced_in() ->
|
||||||
"5.0.17".
|
"5.0.17".
|
||||||
|
|
||||||
|
deprecated_since() ->
|
||||||
|
"5.0.21".
|
||||||
|
|
||||||
-spec list_bridges(node()) -> list() | emqx_rpc:badrpc().
|
-spec list_bridges(node()) -> list() | emqx_rpc:badrpc().
|
||||||
list_bridges(Node) ->
|
list_bridges(Node) ->
|
||||||
rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT).
|
rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT).
|
||||||
|
|
|
@ -0,0 +1,128 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_bridge_proto_v3).
|
||||||
|
|
||||||
|
-behaviour(emqx_bpapi).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
introduced_in/0,
|
||||||
|
|
||||||
|
list_bridges/1,
|
||||||
|
list_bridges_on_nodes/1,
|
||||||
|
restart_bridge_to_node/3,
|
||||||
|
start_bridge_to_node/3,
|
||||||
|
stop_bridge_to_node/3,
|
||||||
|
lookup_from_all_nodes/3,
|
||||||
|
restart_bridges_to_all_nodes/3,
|
||||||
|
start_bridges_to_all_nodes/3,
|
||||||
|
stop_bridges_to_all_nodes/3
|
||||||
|
]).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/bpapi.hrl").
|
||||||
|
|
||||||
|
-define(TIMEOUT, 15000).
|
||||||
|
|
||||||
|
introduced_in() ->
|
||||||
|
"5.0.21".
|
||||||
|
|
||||||
|
-spec list_bridges(node()) -> list() | emqx_rpc:badrpc().
|
||||||
|
list_bridges(Node) ->
|
||||||
|
rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT).
|
||||||
|
|
||||||
|
-spec list_bridges_on_nodes([node()]) ->
|
||||||
|
emqx_rpc:erpc_multicall([emqx_resource:resource_data()]).
|
||||||
|
list_bridges_on_nodes(Nodes) ->
|
||||||
|
erpc:multicall(Nodes, emqx_bridge, list, [], ?TIMEOUT).
|
||||||
|
|
||||||
|
-type key() :: atom() | binary() | [byte()].
|
||||||
|
|
||||||
|
-spec restart_bridge_to_node(node(), key(), key()) ->
|
||||||
|
term().
|
||||||
|
restart_bridge_to_node(Node, BridgeType, BridgeName) ->
|
||||||
|
rpc:call(
|
||||||
|
Node,
|
||||||
|
emqx_bridge_resource,
|
||||||
|
restart,
|
||||||
|
[BridgeType, BridgeName],
|
||||||
|
?TIMEOUT
|
||||||
|
).
|
||||||
|
|
||||||
|
-spec start_bridge_to_node(node(), key(), key()) ->
|
||||||
|
term().
|
||||||
|
start_bridge_to_node(Node, BridgeType, BridgeName) ->
|
||||||
|
rpc:call(
|
||||||
|
Node,
|
||||||
|
emqx_bridge_resource,
|
||||||
|
start,
|
||||||
|
[BridgeType, BridgeName],
|
||||||
|
?TIMEOUT
|
||||||
|
).
|
||||||
|
|
||||||
|
-spec stop_bridge_to_node(node(), key(), key()) ->
|
||||||
|
term().
|
||||||
|
stop_bridge_to_node(Node, BridgeType, BridgeName) ->
|
||||||
|
rpc:call(
|
||||||
|
Node,
|
||||||
|
emqx_bridge_resource,
|
||||||
|
stop,
|
||||||
|
[BridgeType, BridgeName],
|
||||||
|
?TIMEOUT
|
||||||
|
).
|
||||||
|
|
||||||
|
-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
|
||||||
|
emqx_rpc:erpc_multicall().
|
||||||
|
restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
|
erpc:multicall(
|
||||||
|
Nodes,
|
||||||
|
emqx_bridge_resource,
|
||||||
|
restart,
|
||||||
|
[BridgeType, BridgeName],
|
||||||
|
?TIMEOUT
|
||||||
|
).
|
||||||
|
|
||||||
|
-spec start_bridges_to_all_nodes([node()], key(), key()) ->
|
||||||
|
emqx_rpc:erpc_multicall().
|
||||||
|
start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
|
erpc:multicall(
|
||||||
|
Nodes,
|
||||||
|
emqx_bridge_resource,
|
||||||
|
start,
|
||||||
|
[BridgeType, BridgeName],
|
||||||
|
?TIMEOUT
|
||||||
|
).
|
||||||
|
|
||||||
|
-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
|
||||||
|
emqx_rpc:erpc_multicall().
|
||||||
|
stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
|
erpc:multicall(
|
||||||
|
Nodes,
|
||||||
|
emqx_bridge_resource,
|
||||||
|
stop,
|
||||||
|
[BridgeType, BridgeName],
|
||||||
|
?TIMEOUT
|
||||||
|
).
|
||||||
|
|
||||||
|
-spec lookup_from_all_nodes([node()], key(), key()) ->
|
||||||
|
emqx_rpc:erpc_multicall().
|
||||||
|
lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
|
erpc:multicall(
|
||||||
|
Nodes,
|
||||||
|
emqx_bridge_api,
|
||||||
|
lookup_from_local_node,
|
||||||
|
[BridgeType, BridgeName],
|
||||||
|
?TIMEOUT
|
||||||
|
).
|
|
@ -112,6 +112,8 @@
|
||||||
|
|
||||||
-export([apply_reply_fun/2]).
|
-export([apply_reply_fun/2]).
|
||||||
|
|
||||||
|
-export_type([resource_data/0]).
|
||||||
|
|
||||||
-optional_callbacks([
|
-optional_callbacks([
|
||||||
on_query/3,
|
on_query/3,
|
||||||
on_batch_query/3,
|
on_batch_query/3,
|
||||||
|
|
Loading…
Reference in New Issue