diff --git a/apps/emqx/src/bpapi/emqx_bpapi_static_checks.erl b/apps/emqx/src/bpapi/emqx_bpapi_static_checks.erl index 5023e9b9f..fe6673d54 100644 --- a/apps/emqx/src/bpapi/emqx_bpapi_static_checks.erl +++ b/apps/emqx/src/bpapi/emqx_bpapi_static_checks.erl @@ -36,8 +36,9 @@ -type param_types() :: #{emqx_bpapi:var_name() => _Type}. -%% Applications we wish to ignore in the analysis: --define(IGNORED_APPS, "gen_rpc, recon, observer_cli, snabbkaffe, ekka, mria"). +%% Applications and modules we wish to ignore in the analysis: +-define(IGNORED_APPS, "gen_rpc, recon, redbug, observer_cli, snabbkaffe, ekka, mria"). +-define(IGNORED_MODULES, "emqx_rpc"). %% List of known RPC backend modules: -define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc"). %% List of functions in the RPC backend modules that we can ignore: @@ -162,6 +163,7 @@ dump(Opts) -> DialyzerDump = collect_signatures(PLT, APIDump), Release = emqx_app:get_release(), dump_api(#{api => APIDump, signatures => DialyzerDump, release => Release}), + xref:stop(?XREF), erase(bpapi_ok). prepare(#{reldir := RelDir, plt := PLT}) -> @@ -176,7 +178,7 @@ prepare(#{reldir := RelDir, plt := PLT}) -> dialyzer_plt:from_file(PLT). find_remote_calls(_Opts) -> - Query = "XC | (A - [" ?IGNORED_APPS "]:App) + Query = "XC | (A - [" ?IGNORED_APPS "]:App - [" ?IGNORED_MODULES "] : Mod) || ([" ?RPC_MODULES "] : Mod - " ?IGNORED_RPC_CALLS ")", {ok, Calls} = xref:q(?XREF, Query), ?INFO("Calls to RPC modules ~p", [Calls]), diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index d88a1f595..b93bb92c8 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -275,28 +275,20 @@ aggre(Routes) -> -spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode::sync | async) -> emqx_types:deliver_result()). forward(Node, To, Delivery, async) -> - case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of - true -> emqx_metrics:inc('messages.forward'); - {badrpc, Reason} -> - ?SLOG(error, #{ - msg => "async_forward_msg_to_node_failed", - node => Node, - reason => Reason - }, #{topic => To}), - {error, badrpc} - end; - + true = emqx_broker_proto_v1:forward_async(Node, To, Delivery), + emqx_metrics:inc('messages.forward'); forward(Node, To, Delivery, sync) -> - case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of - {badrpc, Reason} -> + case emqx_broker_proto_v1:forward(Node, To, Delivery) of + {Err, Reason} when Err =:= badrpc; Err =:= badtcp -> ?SLOG(error, #{ msg => "sync_forward_msg_to_node_failed", node => Node, - reason => Reason + Err => Reason }, #{topic => To}), {error, badrpc}; Result -> - emqx_metrics:inc('messages.forward'), Result + emqx_metrics:inc('messages.forward'), + Result end. -spec(dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()). diff --git a/apps/emqx/src/emqx_rpc.erl b/apps/emqx/src/emqx_rpc.erl index 516e8f685..1c1a921f6 100644 --- a/apps/emqx/src/emqx_rpc.erl +++ b/apps/emqx/src/emqx_rpc.erl @@ -27,6 +27,12 @@ , multicall/5 ]). +-export_type([ badrpc/0 + , call_result/0 + , cast_result/0 + , multicall_result/0 + ]). + -compile({inline, [ rpc_node/1 , rpc_nodes/1 @@ -34,23 +40,37 @@ -define(DefaultClientNum, 1). +-type badrpc() :: {badrpc, term()} | {badtcp, term()}. + +-type call_result() :: term() | badrpc(). + +-type cast_result() :: true. + +-type multicall_result() :: {_Results :: [term()], _BadNodes :: [node()]}. + +-spec call(node(), module(), atom(), list()) -> call_result(). call(Node, Mod, Fun, Args) -> filter_result(gen_rpc:call(rpc_node(Node), Mod, Fun, Args)). +-spec call(term(), node(), module(), atom(), list()) -> call_result(). call(Key, Node, Mod, Fun, Args) -> filter_result(gen_rpc:call(rpc_node({Key, Node}), Mod, Fun, Args)). +-spec multicall([node()], module(), atom(), list()) -> multicall_result(). multicall(Nodes, Mod, Fun, Args) -> - filter_result(gen_rpc:multicall(rpc_nodes(Nodes), Mod, Fun, Args)). + gen_rpc:multicall(rpc_nodes(Nodes), Mod, Fun, Args). +-spec multicall(term(), [node()], module(), atom(), list()) -> multicall_result(). multicall(Key, Nodes, Mod, Fun, Args) -> - filter_result(gen_rpc:multicall(rpc_nodes([{Key, Node} || Node <- Nodes]), Mod, Fun, Args)). + gen_rpc:multicall(rpc_nodes([{Key, Node} || Node <- Nodes]), Mod, Fun, Args). +-spec cast(node(), module(), atom(), list()) -> cast_result(). cast(Node, Mod, Fun, Args) -> - filter_result(gen_rpc:cast(rpc_node(Node), Mod, Fun, Args)). + gen_rpc:cast(rpc_node(Node), Mod, Fun, Args). +-spec cast(term(), node(), module(), atom(), list()) -> cast_result(). cast(Key, Node, Mod, Fun, Args) -> - filter_result(gen_rpc:cast(rpc_node({Key, Node}), Mod, Fun, Args)). + gen_rpc:cast(rpc_node({Key, Node}), Mod, Fun, Args). rpc_node(Node) when is_atom(Node) -> {Node, rand:uniform(max_client_num())}; diff --git a/apps/emqx/src/proto/emqx_broker_proto_v1.erl b/apps/emqx/src/proto/emqx_broker_proto_v1.erl index d5725b8ac..7643fa40b 100644 --- a/apps/emqx/src/proto/emqx_broker_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_broker_proto_v1.erl @@ -29,10 +29,11 @@ introduced_in() -> "5.0.0". --spec forward(node(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result(). +-spec forward(node(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result() + | emqx_rpc:badrpc(). forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> emqx_rpc:call(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]). --spec forward_async(node(), emqx_types:topic(), emqx_types:delivery()) -> ok. +-spec forward_async(node(), emqx_types:topic(), emqx_types:delivery()) -> true. forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]). diff --git a/apps/emqx/test/props/prop_emqx_rpc.erl b/apps/emqx/test/props/prop_emqx_rpc.erl index 7c2c61a5d..fb8447fe1 100644 --- a/apps/emqx/test/props/prop_emqx_rpc.erl +++ b/apps/emqx/test/props/prop_emqx_rpc.erl @@ -60,7 +60,6 @@ prop_nodes() -> begin Nodes = punch(Nodes0), case emqx_rpc:multicall(Nodes, erlang, system_time, []) of - {badrpc, _Reason} -> true; {RealResults, RealBadNodes} when is_list(RealResults); is_list(RealBadNodes) -> @@ -74,7 +73,6 @@ prop_nodes_with_key() -> begin Nodes = punch(Nodes0), case emqx_rpc:multicall(Key, Nodes, erlang, system_time, []) of - {badrpc, _Reason} -> true; {RealResults, RealBadNodes} when is_list(RealResults); is_list(RealBadNodes) ->