diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index bdb6acd4f..0023beefa 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -275,28 +275,20 @@ aggre(Routes) -> -spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode::sync | async) -> emqx_types:deliver_result()). forward(Node, To, Delivery, async) -> - case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of - true -> emqx_metrics:inc('messages.forward'); - {badrpc, Reason} -> - ?SLOG(error, #{ - msg => "async_forward_msg_to_node_failed", - node => Node, - reason => Reason - }, #{topic => To}), - {error, badrpc} - end; - + true = emqx_broker_proto_v1:forward_async(Node, To, Delivery), + emqx_metrics:inc('messages.forward'); forward(Node, To, Delivery, sync) -> - case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of - {badrpc, Reason} -> + case emqx_broker_proto_v1:forward(Node, To, Delivery) of + {Err, Reason} when Err =:= badrpc; Err =:= badtcp -> ?SLOG(error, #{ msg => "sync_forward_msg_to_node_failed", node => Node, - reason => Reason + Err => Reason }, #{topic => To}), {error, badrpc}; Result -> - emqx_metrics:inc('messages.forward'), Result + emqx_metrics:inc('messages.forward'), + Result end. -spec(dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()). diff --git a/apps/emqx/src/proto/emqx_broker_proto_v1.erl b/apps/emqx/src/proto/emqx_broker_proto_v1.erl index d5725b8ac..7643fa40b 100644 --- a/apps/emqx/src/proto/emqx_broker_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_broker_proto_v1.erl @@ -29,10 +29,11 @@ introduced_in() -> "5.0.0". --spec forward(node(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result(). +-spec forward(node(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result() + | emqx_rpc:badrpc(). forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> emqx_rpc:call(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]). --spec forward_async(node(), emqx_types:topic(), emqx_types:delivery()) -> ok. +-spec forward_async(node(), emqx_types:topic(), emqx_types:delivery()) -> true. forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).