Merge pull request #6808 from k32/bpapi-exhook
refactor(emqx_exhook): Decorate RPCs
This commit is contained in:
commit
5411f6ff80
|
@ -25,6 +25,8 @@
|
||||||
, cast/5
|
, cast/5
|
||||||
, multicall/4
|
, multicall/4
|
||||||
, multicall/5
|
, multicall/5
|
||||||
|
|
||||||
|
, unwrap_erpc/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export_type([ badrpc/0
|
-export_type([ badrpc/0
|
||||||
|
@ -106,3 +108,15 @@ filter_result(Delivery) ->
|
||||||
|
|
||||||
max_client_num() ->
|
max_client_num() ->
|
||||||
emqx:get_config([rpc, tcp_client_num], ?DefaultClientNum).
|
emqx:get_config([rpc, tcp_client_num], ?DefaultClientNum).
|
||||||
|
|
||||||
|
-spec unwrap_erpc(emqx_rpc:erpc(A)) -> A | {error, _Err}.
|
||||||
|
unwrap_erpc({ok, A}) ->
|
||||||
|
A;
|
||||||
|
unwrap_erpc({throw, A}) ->
|
||||||
|
{error, A};
|
||||||
|
unwrap_erpc({error, {exception, Err, _Stack}}) ->
|
||||||
|
{error, Err};
|
||||||
|
unwrap_erpc({error, {exit, Err}}) ->
|
||||||
|
{error, Err};
|
||||||
|
unwrap_erpc({error, {erpc, Err}}) ->
|
||||||
|
{error, Err}.
|
||||||
|
|
|
@ -51,7 +51,7 @@
|
||||||
%% List of known functions also known to do RPC:
|
%% List of known functions also known to do RPC:
|
||||||
-define(RPC_FUNCTIONS, "emqx_cluster_rpc:multicall/3, emqx_cluster_rpc:multicall/5").
|
-define(RPC_FUNCTIONS, "emqx_cluster_rpc:multicall/3, emqx_cluster_rpc:multicall/5").
|
||||||
%% List of functions in the RPC backend modules that we can ignore:
|
%% List of functions in the RPC backend modules that we can ignore:
|
||||||
-define(IGNORED_RPC_CALLS, "gen_rpc:nodes/0").
|
-define(IGNORED_RPC_CALLS, "gen_rpc:nodes/0, emqx_rpc:unwrap_erpc/1").
|
||||||
%% List of business-layer functions that are exempt from the checks:
|
%% List of business-layer functions that are exempt from the checks:
|
||||||
-define(EXEMPTIONS, "emqx_mgmt_api:do_query/6" % Reason: legacy code. A fun and a QC query are
|
-define(EXEMPTIONS, "emqx_mgmt_api:do_query/6" % Reason: legacy code. A fun and a QC query are
|
||||||
% passed in the args, it's futile to try to statically
|
% passed in the args, it's futile to try to statically
|
||||||
|
@ -213,7 +213,7 @@ prepare(#{reldir := RelDir, plt := PLT}) ->
|
||||||
|
|
||||||
find_remote_calls(_Opts) ->
|
find_remote_calls(_Opts) ->
|
||||||
Query = "XC | (A - [" ?IGNORED_APPS "]:App - [" ?IGNORED_MODULES "]:Mod - [" ?EXEMPTIONS "])
|
Query = "XC | (A - [" ?IGNORED_APPS "]:App - [" ?IGNORED_MODULES "]:Mod - [" ?EXEMPTIONS "])
|
||||||
|| (([" ?RPC_MODULES "] : Mod + [" ?RPC_FUNCTIONS "]) - " ?IGNORED_RPC_CALLS ")",
|
|| (([" ?RPC_MODULES "] : Mod + [" ?RPC_FUNCTIONS "]) - [" ?IGNORED_RPC_CALLS "])",
|
||||||
{ok, Calls} = xref:q(?XREF, Query),
|
{ok, Calls} = xref:q(?XREF, Query),
|
||||||
?INFO("Calls to RPC modules ~p", [Calls]),
|
?INFO("Calls to RPC modules ~p", [Calls]),
|
||||||
{Callers, _Callees} = lists:unzip(Calls),
|
{Callers, _Callees} = lists:unzip(Calls),
|
||||||
|
|
|
@ -32,9 +32,6 @@
|
||||||
-define(BAD_REQUEST, 'BAD_REQUEST').
|
-define(BAD_REQUEST, 'BAD_REQUEST').
|
||||||
-define(BAD_RPC, 'BAD_RPC').
|
-define(BAD_RPC, 'BAD_RPC').
|
||||||
|
|
||||||
-type rpc_result() :: {error, any()}
|
|
||||||
| any().
|
|
||||||
|
|
||||||
-dialyzer([{nowarn_function, [ fill_cluster_server_info/5
|
-dialyzer([{nowarn_function, [ fill_cluster_server_info/5
|
||||||
, nodes_server_info/5
|
, nodes_server_info/5
|
||||||
, fill_server_hooks_info/4
|
, fill_server_hooks_info/4
|
||||||
|
@ -285,7 +282,7 @@ get_nodes_server_info(Name) ->
|
||||||
%% GET /exhooks
|
%% GET /exhooks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
nodes_all_server_info(ConfL) ->
|
nodes_all_server_info(ConfL) ->
|
||||||
AllInfos = call_cluster(emqx_exhook_mgr, all_servers_info, []),
|
AllInfos = call_cluster(fun(Nodes) -> emqx_exhook_proto_v1:all_servers_info(Nodes) end),
|
||||||
Default = emqx_exhook_metrics:new_metrics_info(),
|
Default = emqx_exhook_metrics:new_metrics_info(),
|
||||||
node_all_server_info(ConfL, AllInfos, Default, []).
|
node_all_server_info(ConfL, AllInfos, Default, []).
|
||||||
|
|
||||||
|
@ -324,7 +321,7 @@ fill_cluster_server_info([], StatusL, MetricsL, ServerName, _) ->
|
||||||
%% GET /exhooks/{name}
|
%% GET /exhooks/{name}
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
nodes_server_info(Name) ->
|
nodes_server_info(Name) ->
|
||||||
InfoL = call_cluster(emqx_exhook_mgr, server_info, [Name]),
|
InfoL = call_cluster(fun(Nodes) -> emqx_exhook_proto_v1:server_info(Nodes, Name) end),
|
||||||
Default = emqx_exhook_metrics:new_metrics_info(),
|
Default = emqx_exhook_metrics:new_metrics_info(),
|
||||||
nodes_server_info(InfoL, Name, Default, [], []).
|
nodes_server_info(InfoL, Name, Default, [], []).
|
||||||
|
|
||||||
|
@ -359,7 +356,7 @@ get_nodes_server_hooks_info(Name) ->
|
||||||
case emqx_exhook_mgr:hooks(Name) of
|
case emqx_exhook_mgr:hooks(Name) of
|
||||||
[] -> [];
|
[] -> [];
|
||||||
Hooks ->
|
Hooks ->
|
||||||
AllInfos = call_cluster(emqx_exhook_mgr, server_hooks_metrics, [Name]),
|
AllInfos = call_cluster(fun(Nodes) -> emqx_exhook_proto_v1:server_hooks_metrics(Nodes, Name) end),
|
||||||
Default = emqx_exhook_metrics:new_metrics_info(),
|
Default = emqx_exhook_metrics:new_metrics_info(),
|
||||||
get_nodes_server_hooks_info(Hooks, AllInfos, Default, [])
|
get_nodes_server_hooks_info(Hooks, AllInfos, Default, [])
|
||||||
end.
|
end.
|
||||||
|
@ -387,16 +384,10 @@ fill_server_hooks_info([], _Name, _Default, MetricsL) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% cluster call
|
%% cluster call
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
call_cluster(Module, Fun, Args) ->
|
|
||||||
|
-spec call_cluster(fun(([node()]) -> emqx_rpc:erpc_multicall(A))) ->
|
||||||
|
[{node(), A | {error, _Err}}].
|
||||||
|
call_cluster(Fun) ->
|
||||||
Nodes = mria_mnesia:running_nodes(),
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
[{Node, rpc_call(Node, Module, Fun, Args)} || Node <- Nodes].
|
Ret = Fun(Nodes),
|
||||||
|
lists:zip(Nodes, lists:map(fun emqx_rpc:unwrap_erpc/1, Ret)).
|
||||||
-spec rpc_call(node(), atom(), atom(), list()) -> rpc_result().
|
|
||||||
rpc_call(Node, Module, Fun, Args) when Node =:= node() ->
|
|
||||||
erlang:apply(Module, Fun, Args);
|
|
||||||
|
|
||||||
rpc_call(Node, Module, Fun, Args) ->
|
|
||||||
case rpc:call(Node, Module, Fun, Args) of
|
|
||||||
{badrpc, Reason} -> {error, Reason};
|
|
||||||
Res -> Res
|
|
||||||
end.
|
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_exhook_proto_v1).
|
||||||
|
|
||||||
|
-behaviour(emqx_bpapi).
|
||||||
|
|
||||||
|
-export([ introduced_in/0
|
||||||
|
|
||||||
|
, all_servers_info/1
|
||||||
|
, server_info/2
|
||||||
|
, server_hooks_metrics/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/bpapi.hrl").
|
||||||
|
|
||||||
|
introduced_in() ->
|
||||||
|
"5.0.0".
|
||||||
|
|
||||||
|
-spec all_servers_info([node()]) ->
|
||||||
|
emqx_rpc:erpc_multicall(map()).
|
||||||
|
all_servers_info(Nodes) ->
|
||||||
|
erpc:multicall(Nodes, emqx_exhook_mgr, all_servers_info, []).
|
||||||
|
|
||||||
|
-spec server_info([node()], emqx_exhook_mgr:server_name()) ->
|
||||||
|
emqx_rpc:erpc_multicall(map()).
|
||||||
|
server_info(Nodes, Name) ->
|
||||||
|
erpc:multicall(Nodes, emqx_exhook_mgr, server_info, [Name]).
|
||||||
|
|
||||||
|
-spec server_hooks_metrics([node()], emqx_exhook_mgr:server_name()) ->
|
||||||
|
emqx_rpc:erpc_multicall(emqx_exhook_metrics:hooks_metrics()).
|
||||||
|
server_hooks_metrics(Nodes, Name) ->
|
||||||
|
erpc:multicall(Nodes, emqx_exhook_mgr, server_hooks_metrics, [Name]).
|
Loading…
Reference in New Issue