From 1754afdab098f25579d57b3a51aa630c8d4be122 Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Fri, 7 Jan 2022 12:06:50 +0100 Subject: [PATCH] refactor(emqx_broker): Decorate remote procedure calls --- apps/emqx/src/emqx_broker.erl | 22 +++++++------------- apps/emqx/src/proto/emqx_broker_proto_v1.erl | 5 +++-- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index bdb6acd4f..0023beefa 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -275,28 +275,20 @@ aggre(Routes) -> -spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode::sync | async) -> emqx_types:deliver_result()). forward(Node, To, Delivery, async) -> - case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of - true -> emqx_metrics:inc('messages.forward'); - {badrpc, Reason} -> - ?SLOG(error, #{ - msg => "async_forward_msg_to_node_failed", - node => Node, - reason => Reason - }, #{topic => To}), - {error, badrpc} - end; - + true = emqx_broker_proto_v1:forward_async(Node, To, Delivery), + emqx_metrics:inc('messages.forward'); forward(Node, To, Delivery, sync) -> - case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of - {badrpc, Reason} -> + case emqx_broker_proto_v1:forward(Node, To, Delivery) of + {Err, Reason} when Err =:= badrpc; Err =:= badtcp -> ?SLOG(error, #{ msg => "sync_forward_msg_to_node_failed", node => Node, - reason => Reason + Err => Reason }, #{topic => To}), {error, badrpc}; Result -> - emqx_metrics:inc('messages.forward'), Result + emqx_metrics:inc('messages.forward'), + Result end. -spec(dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()). diff --git a/apps/emqx/src/proto/emqx_broker_proto_v1.erl b/apps/emqx/src/proto/emqx_broker_proto_v1.erl index d5725b8ac..7643fa40b 100644 --- a/apps/emqx/src/proto/emqx_broker_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_broker_proto_v1.erl @@ -29,10 +29,11 @@ introduced_in() -> "5.0.0". --spec forward(node(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result(). +-spec forward(node(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result() + | emqx_rpc:badrpc(). forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> emqx_rpc:call(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]). --spec forward_async(node(), emqx_types:topic(), emqx_types:delivery()) -> ok. +-spec forward_async(node(), emqx_types:topic(), emqx_types:delivery()) -> true. forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).