refactor(emqx_bridge): Decorate remote procedure calls
This commit is contained in:
parent
e969eff702
commit
39766d0ab4
|
@ -32,7 +32,7 @@
|
||||||
, cast_result/0
|
, cast_result/0
|
||||||
, multicall_result/0
|
, multicall_result/0
|
||||||
, erpc/1
|
, erpc/1
|
||||||
, erpc_multicast/1
|
, erpc_multicall/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-compile({inline,
|
-compile({inline,
|
||||||
|
@ -56,7 +56,7 @@
|
||||||
| {error, {exception, _Reason, _Stack :: list()}}
|
| {error, {exception, _Reason, _Stack :: list()}}
|
||||||
| {error, {erpc, _Reason}}.
|
| {error, {erpc, _Reason}}.
|
||||||
|
|
||||||
-type erpc_multicast(Ret) :: [erpc(Ret)].
|
-type erpc_multicall(Ret) :: [erpc(Ret)].
|
||||||
|
|
||||||
-spec call(node(), module(), atom(), list()) -> call_result().
|
-spec call(node(), module(), atom(), list()) -> call_result().
|
||||||
call(Node, Mod, Fun, Args) ->
|
call(Node, Mod, Fun, Args) ->
|
||||||
|
|
|
@ -29,8 +29,7 @@
|
||||||
-export(['/bridges'/2, '/bridges/:id'/2,
|
-export(['/bridges'/2, '/bridges/:id'/2,
|
||||||
'/bridges/:id/operation/:operation'/2]).
|
'/bridges/:id/operation/:operation'/2]).
|
||||||
|
|
||||||
-export([ list_local_bridges/1
|
-export([ lookup_from_local_node/2
|
||||||
, lookup_from_local_node/2
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(TYPES, [mqtt, http]).
|
-define(TYPES, [mqtt, http]).
|
||||||
|
@ -288,12 +287,8 @@ schema("/bridges/:id/operation/:operation") ->
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
'/bridges'(get, _Params) ->
|
'/bridges'(get, _Params) ->
|
||||||
{200, zip_bridges([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}.
|
{200, zip_bridges([[format_resp(Data) || Data <- emqx_bridge_proto_v1:list_bridges(Node)]
|
||||||
|
|| Node <- mria_mnesia:running_nodes()])}.
|
||||||
list_local_bridges(Node) when Node =:= node() ->
|
|
||||||
[format_resp(Data) || Data <- emqx_bridge:list()];
|
|
||||||
list_local_bridges(Node) ->
|
|
||||||
rpc_call(Node, list_local_bridges, [Node]).
|
|
||||||
|
|
||||||
'/bridges/:id'(get, #{bindings := #{id := Id}}) ->
|
'/bridges/:id'(get, #{bindings := #{id := Id}}) ->
|
||||||
?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200));
|
?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200));
|
||||||
|
@ -321,7 +316,8 @@ list_local_bridges(Node) ->
|
||||||
end).
|
end).
|
||||||
|
|
||||||
lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
|
lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
|
||||||
case rpc_multicall(lookup_from_local_node, [BridgeType, BridgeName]) of
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
|
case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
|
||||||
{ok, [{ok, _} | _] = Results} ->
|
{ok, [{ok, _} | _] = Results} ->
|
||||||
{SuccCode, format_bridge_info([R || {ok, R} <- Results])};
|
{SuccCode, format_bridge_info([R || {ok, R} <- Results])};
|
||||||
{ok, [{error, not_found} | _]} ->
|
{ok, [{error, not_found} | _]} ->
|
||||||
|
@ -433,9 +429,8 @@ format_metrics(#{
|
||||||
} }) ->
|
} }) ->
|
||||||
?METRICS(Match, Succ, Failed + Ex, Rate, Rate5m, RateMax).
|
?METRICS(Match, Succ, Failed + Ex, Rate, Rate5m, RateMax).
|
||||||
|
|
||||||
rpc_multicall(Func, Args) ->
|
|
||||||
Nodes = mria_mnesia:running_nodes(),
|
is_ok(ResL) ->
|
||||||
ResL = erpc:multicall(Nodes, ?MODULE, Func, Args, 15000),
|
|
||||||
case lists:filter(fun({ok, _}) -> false; (_) -> true end, ResL) of
|
case lists:filter(fun({ok, _}) -> false; (_) -> true end, ResL) of
|
||||||
[] -> {ok, [Res || {ok, Res} <- ResL]};
|
[] -> {ok, [Res || {ok, Res} <- ResL]};
|
||||||
ErrL -> {error, ErrL}
|
ErrL -> {error, ErrL}
|
||||||
|
@ -446,21 +441,6 @@ filter_out_request_body(Conf) ->
|
||||||
<<"node_metrics">>, <<"metrics">>, <<"node">>],
|
<<"node_metrics">>, <<"metrics">>, <<"node">>],
|
||||||
maps:without(ExtraConfs, Conf).
|
maps:without(ExtraConfs, Conf).
|
||||||
|
|
||||||
rpc_call(Node, Fun, Args) ->
|
|
||||||
rpc_call(Node, ?MODULE, Fun, Args).
|
|
||||||
|
|
||||||
rpc_call(Node, Mod, Fun, Args) when Node =:= node() ->
|
|
||||||
apply(Mod, Fun, Args);
|
|
||||||
rpc_call(Node, Mod, Fun, Args) ->
|
|
||||||
case rpc:call(Node, Mod, Fun, Args) of
|
|
||||||
{badrpc, Reason} -> {error, Reason};
|
|
||||||
Res -> Res
|
|
||||||
end.
|
|
||||||
|
|
||||||
wrap_rpc({badrpc, Reason}) -> {error, Reason};
|
|
||||||
wrap_rpc(Ret) -> Ret.
|
|
||||||
|
|
||||||
|
|
||||||
error_msg(Code, Msg) when is_binary(Msg) ->
|
error_msg(Code, Msg) when is_binary(Msg) ->
|
||||||
#{code => Code, message => Msg};
|
#{code => Code, message => Msg};
|
||||||
error_msg(Code, Msg) ->
|
error_msg(Code, Msg) ->
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_bridge_proto_v1).
|
||||||
|
|
||||||
|
-behaviour(emqx_bpapi).
|
||||||
|
|
||||||
|
-export([ introduced_in/0
|
||||||
|
|
||||||
|
, list_bridges/1
|
||||||
|
, lookup_from_all_nodes/3
|
||||||
|
]).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/bpapi.hrl").
|
||||||
|
|
||||||
|
-define(TIMEOUT, 15000).
|
||||||
|
|
||||||
|
introduced_in() ->
|
||||||
|
"5.0.0".
|
||||||
|
|
||||||
|
-spec list_bridges(node()) -> list() | emqx_rpc:badrpc().
|
||||||
|
list_bridges(Node) ->
|
||||||
|
rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT).
|
||||||
|
|
||||||
|
-type key() :: atom() | binary() | [byte()].
|
||||||
|
|
||||||
|
-spec lookup_from_all_nodes([node()], key(), key()) ->
|
||||||
|
emqx_rpc:erpc_multicall().
|
||||||
|
lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
|
erpc:multicall(Nodes, emqx_bridge_api, lookup_from_local_node, [BridgeType, BridgeName], ?TIMEOUT).
|
Loading…
Reference in New Issue