refactor(emqx_broker): Decorate remote procedure calls

This commit is contained in:
k32 2022-01-07 12:06:50 +01:00
parent de89f7b253
commit 1754afdab0
2 changed files with 10 additions and 17 deletions

View File

@ -275,28 +275,20 @@ aggre(Routes) ->
-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode::sync | async) -spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode::sync | async)
-> emqx_types:deliver_result()). -> emqx_types:deliver_result()).
forward(Node, To, Delivery, async) -> forward(Node, To, Delivery, async) ->
case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of true = emqx_broker_proto_v1:forward_async(Node, To, Delivery),
true -> emqx_metrics:inc('messages.forward'); emqx_metrics:inc('messages.forward');
{badrpc, Reason} ->
?SLOG(error, #{
msg => "async_forward_msg_to_node_failed",
node => Node,
reason => Reason
}, #{topic => To}),
{error, badrpc}
end;
forward(Node, To, Delivery, sync) -> forward(Node, To, Delivery, sync) ->
case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of case emqx_broker_proto_v1:forward(Node, To, Delivery) of
{badrpc, Reason} -> {Err, Reason} when Err =:= badrpc; Err =:= badtcp ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "sync_forward_msg_to_node_failed", msg => "sync_forward_msg_to_node_failed",
node => Node, node => Node,
reason => Reason Err => Reason
}, #{topic => To}), }, #{topic => To}),
{error, badrpc}; {error, badrpc};
Result -> Result ->
emqx_metrics:inc('messages.forward'), Result emqx_metrics:inc('messages.forward'),
Result
end. end.
-spec(dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()). -spec(dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).

View File

@ -29,10 +29,11 @@
introduced_in() -> introduced_in() ->
"5.0.0". "5.0.0".
-spec forward(node(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result(). -spec forward(node(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()
| emqx_rpc:badrpc().
forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
emqx_rpc:call(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]). emqx_rpc:call(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).
-spec forward_async(node(), emqx_types:topic(), emqx_types:delivery()) -> ok. -spec forward_async(node(), emqx_types:topic(), emqx_types:delivery()) -> true.
forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]). emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).