refactor(emqx_exhook): Decorate RPCs
This commit is contained in:
parent
455606fc0e
commit
7061e6eefe
|
@ -25,6 +25,8 @@
|
|||
, cast/5
|
||||
, multicall/4
|
||||
, multicall/5
|
||||
|
||||
, unwrap_erpc/1
|
||||
]).
|
||||
|
||||
-export_type([ badrpc/0
|
||||
|
@ -106,3 +108,15 @@ filter_result(Delivery) ->
|
|||
|
||||
max_client_num() ->
|
||||
emqx:get_config([rpc, tcp_client_num], ?DefaultClientNum).
|
||||
|
||||
-spec unwrap_erpc(emqx_rpc:erpc(A)) -> A | {error, _Err}.
|
||||
unwrap_erpc({ok, A}) ->
|
||||
A;
|
||||
unwrap_erpc({throw, A}) ->
|
||||
{error, A};
|
||||
unwrap_erpc({error, {exception, Err, _Stack}}) ->
|
||||
{error, Err};
|
||||
unwrap_erpc({error, {exit, Err}}) ->
|
||||
{error, Err};
|
||||
unwrap_erpc({error, {erpc, Err}}) ->
|
||||
{error, Err}.
|
||||
|
|
|
@ -51,7 +51,7 @@
|
|||
%% List of known functions also known to do RPC:
|
||||
-define(RPC_FUNCTIONS, "emqx_cluster_rpc:multicall/3, emqx_cluster_rpc:multicall/5").
|
||||
%% List of functions in the RPC backend modules that we can ignore:
|
||||
-define(IGNORED_RPC_CALLS, "gen_rpc:nodes/0").
|
||||
-define(IGNORED_RPC_CALLS, "gen_rpc:nodes/0, emqx_rpc:unwrap_erpc/1").
|
||||
%% List of business-layer functions that are exempt from the checks:
|
||||
-define(EXEMPTIONS, "emqx_mgmt_api:do_query/6" % Reason: legacy code. A fun and a QC query are
|
||||
% passed in the args, it's futile to try to statically
|
||||
|
@ -213,7 +213,7 @@ prepare(#{reldir := RelDir, plt := PLT}) ->
|
|||
|
||||
find_remote_calls(_Opts) ->
|
||||
Query = "XC | (A - [" ?IGNORED_APPS "]:App - [" ?IGNORED_MODULES "]:Mod - [" ?EXEMPTIONS "])
|
||||
|| (([" ?RPC_MODULES "] : Mod + [" ?RPC_FUNCTIONS "]) - " ?IGNORED_RPC_CALLS ")",
|
||||
|| (([" ?RPC_MODULES "] : Mod + [" ?RPC_FUNCTIONS "]) - [" ?IGNORED_RPC_CALLS "])",
|
||||
{ok, Calls} = xref:q(?XREF, Query),
|
||||
?INFO("Calls to RPC modules ~p", [Calls]),
|
||||
{Callers, _Callees} = lists:unzip(Calls),
|
||||
|
|
|
@ -32,9 +32,6 @@
|
|||
-define(BAD_REQUEST, 'BAD_REQUEST').
|
||||
-define(BAD_RPC, 'BAD_RPC').
|
||||
|
||||
-type rpc_result() :: {error, any()}
|
||||
| any().
|
||||
|
||||
-dialyzer([{nowarn_function, [ fill_cluster_server_info/5
|
||||
, nodes_server_info/5
|
||||
, fill_server_hooks_info/4
|
||||
|
@ -285,7 +282,7 @@ get_nodes_server_info(Name) ->
|
|||
%% GET /exhooks
|
||||
%%--------------------------------------------------------------------
|
||||
nodes_all_server_info(ConfL) ->
|
||||
AllInfos = call_cluster(emqx_exhook_mgr, all_servers_info, []),
|
||||
AllInfos = call_cluster(fun(Nodes) -> emqx_exhook_proto_v1:all_servers_info(Nodes) end),
|
||||
Default = emqx_exhook_metrics:new_metrics_info(),
|
||||
node_all_server_info(ConfL, AllInfos, Default, []).
|
||||
|
||||
|
@ -324,7 +321,7 @@ fill_cluster_server_info([], StatusL, MetricsL, ServerName, _) ->
|
|||
%% GET /exhooks/{name}
|
||||
%%--------------------------------------------------------------------
|
||||
nodes_server_info(Name) ->
|
||||
InfoL = call_cluster(emqx_exhook_mgr, server_info, [Name]),
|
||||
InfoL = call_cluster(fun(Nodes) -> emqx_exhook_proto_v1:server_info(Nodes, Name) end),
|
||||
Default = emqx_exhook_metrics:new_metrics_info(),
|
||||
nodes_server_info(InfoL, Name, Default, [], []).
|
||||
|
||||
|
@ -359,7 +356,7 @@ get_nodes_server_hooks_info(Name) ->
|
|||
case emqx_exhook_mgr:hooks(Name) of
|
||||
[] -> [];
|
||||
Hooks ->
|
||||
AllInfos = call_cluster(emqx_exhook_mgr, server_hooks_metrics, [Name]),
|
||||
AllInfos = call_cluster(fun(Nodes) -> emqx_exhook_proto_v1:server_hooks_metrics(Nodes, Name) end),
|
||||
Default = emqx_exhook_metrics:new_metrics_info(),
|
||||
get_nodes_server_hooks_info(Hooks, AllInfos, Default, [])
|
||||
end.
|
||||
|
@ -387,16 +384,10 @@ fill_server_hooks_info([], _Name, _Default, MetricsL) ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% cluster call
|
||||
%%--------------------------------------------------------------------
|
||||
call_cluster(Module, Fun, Args) ->
|
||||
|
||||
-spec call_cluster(fun(([node()]) -> emqx_rpc:erpc_multicall(A))) ->
|
||||
[{node(), A | {error, _Err}}].
|
||||
call_cluster(Fun) ->
|
||||
Nodes = mria_mnesia:running_nodes(),
|
||||
[{Node, rpc_call(Node, Module, Fun, Args)} || Node <- Nodes].
|
||||
|
||||
-spec rpc_call(node(), atom(), atom(), list()) -> rpc_result().
|
||||
rpc_call(Node, Module, Fun, Args) when Node =:= node() ->
|
||||
erlang:apply(Module, Fun, Args);
|
||||
|
||||
rpc_call(Node, Module, Fun, Args) ->
|
||||
case rpc:call(Node, Module, Fun, Args) of
|
||||
{badrpc, Reason} -> {error, Reason};
|
||||
Res -> Res
|
||||
end.
|
||||
Ret = Fun(Nodes),
|
||||
lists:zip(Nodes, lists:map(fun emqx_rpc:unwrap_erpc/1, Ret)).
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_exhook_proto_v1).
|
||||
|
||||
-behaviour(emqx_bpapi).
|
||||
|
||||
-export([ introduced_in/0
|
||||
|
||||
, all_servers_info/1
|
||||
, server_info/2
|
||||
, server_hooks_metrics/2
|
||||
]).
|
||||
|
||||
-include_lib("emqx/include/bpapi.hrl").
|
||||
|
||||
introduced_in() ->
|
||||
"5.0.0".
|
||||
|
||||
-spec all_servers_info([node()]) ->
|
||||
emqx_rpc:erpc_multicall(map()).
|
||||
all_servers_info(Nodes) ->
|
||||
erpc:multicall(Nodes, emqx_exhook_mgr, all_servers_info, []).
|
||||
|
||||
-spec server_info([node()], emqx_exhook_mgr:server_name()) ->
|
||||
emqx_rpc:erpc_multicall(map()).
|
||||
server_info(Nodes, Name) ->
|
||||
erpc:multicall(Nodes, emqx_exhook_mgr, server_info, [Name]).
|
||||
|
||||
-spec server_hooks_metrics([node()], emqx_exhook_mgr:server_name()) ->
|
||||
emqx_rpc:erpc_multicall(emqx_exhook_metrics:hooks_metrics()).
|
||||
server_hooks_metrics(Nodes, Name) ->
|
||||
erpc:multicall(Nodes, emqx_exhook_mgr, server_hooks_metrics, [Name]).
|
Loading…
Reference in New Issue