diff --git a/apps/emqx/src/emqx_rpc.erl b/apps/emqx/src/emqx_rpc.erl index 91e796725..0cdc09b22 100644 --- a/apps/emqx/src/emqx_rpc.erl +++ b/apps/emqx/src/emqx_rpc.erl @@ -32,7 +32,7 @@ , cast_result/0 , multicall_result/0 , erpc/1 - , erpc_multicast/1 + , erpc_multicall/1 ]). -compile({inline, @@ -56,7 +56,7 @@ | {error, {exception, _Reason, _Stack :: list()}} | {error, {erpc, _Reason}}. --type erpc_multicast(Ret) :: [erpc(Ret)]. +-type erpc_multicall(Ret) :: [erpc(Ret)]. -spec call(node(), module(), atom(), list()) -> call_result(). call(Node, Mod, Fun, Args) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 847495cbc..d78cbf5f2 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -29,8 +29,7 @@ -export(['/bridges'/2, '/bridges/:id'/2, '/bridges/:id/operation/:operation'/2]). --export([ list_local_bridges/1 - , lookup_from_local_node/2 +-export([ lookup_from_local_node/2 ]). -define(TYPES, [mqtt, http]). @@ -288,12 +287,8 @@ schema("/bridges/:id/operation/:operation") -> end end; '/bridges'(get, _Params) -> - {200, zip_bridges([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}. - -list_local_bridges(Node) when Node =:= node() -> - [format_resp(Data) || Data <- emqx_bridge:list()]; -list_local_bridges(Node) -> - rpc_call(Node, list_local_bridges, [Node]). + {200, zip_bridges([[format_resp(Data) || Data <- emqx_bridge_proto_v1:list_bridges(Node)] + || Node <- mria_mnesia:running_nodes()])}. '/bridges/:id'(get, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200)); @@ -321,7 +316,8 @@ list_local_bridges(Node) -> end). lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> - case rpc_multicall(lookup_from_local_node, [BridgeType, BridgeName]) of + Nodes = mria_mnesia:running_nodes(), + case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of {ok, [{ok, _} | _] = Results} -> {SuccCode, format_bridge_info([R || {ok, R} <- Results])}; {ok, [{error, not_found} | _]} -> @@ -433,9 +429,8 @@ format_metrics(#{ } }) -> ?METRICS(Match, Succ, Failed + Ex, Rate, Rate5m, RateMax). -rpc_multicall(Func, Args) -> - Nodes = mria_mnesia:running_nodes(), - ResL = erpc:multicall(Nodes, ?MODULE, Func, Args, 15000), + +is_ok(ResL) -> case lists:filter(fun({ok, _}) -> false; (_) -> true end, ResL) of [] -> {ok, [Res || {ok, Res} <- ResL]}; ErrL -> {error, ErrL} @@ -446,21 +441,6 @@ filter_out_request_body(Conf) -> <<"node_metrics">>, <<"metrics">>, <<"node">>], maps:without(ExtraConfs, Conf). -rpc_call(Node, Fun, Args) -> - rpc_call(Node, ?MODULE, Fun, Args). - -rpc_call(Node, Mod, Fun, Args) when Node =:= node() -> - apply(Mod, Fun, Args); -rpc_call(Node, Mod, Fun, Args) -> - case rpc:call(Node, Mod, Fun, Args) of - {badrpc, Reason} -> {error, Reason}; - Res -> Res - end. - -wrap_rpc({badrpc, Reason}) -> {error, Reason}; -wrap_rpc(Ret) -> Ret. - - error_msg(Code, Msg) when is_binary(Msg) -> #{code => Code, message => Msg}; error_msg(Code, Msg) -> diff --git a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl new file mode 100644 index 000000000..71ea1d2dc --- /dev/null +++ b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl @@ -0,0 +1,43 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + + , list_bridges/1 + , lookup_from_all_nodes/3 + ]). + +-include_lib("emqx/include/bpapi.hrl"). + +-define(TIMEOUT, 15000). + +introduced_in() -> + "5.0.0". + +-spec list_bridges(node()) -> list() | emqx_rpc:badrpc(). +list_bridges(Node) -> + rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT). + +-type key() :: atom() | binary() | [byte()]. + +-spec lookup_from_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +lookup_from_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall(Nodes, emqx_bridge_api, lookup_from_local_node, [BridgeType, BridgeName], ?TIMEOUT).