diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index fee327927..09a2e2279 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -256,7 +256,7 @@ aggre(Routes) -> -spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async) -> emqx_types:deliver_result()). forward(Node, To, Delivery, async) -> - case emqx_rpc:cast(Node, ?BROKER, dispatch, [To, Delivery]) of + case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of true -> ok; {badrpc, Reason} -> ?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]), @@ -264,7 +264,7 @@ forward(Node, To, Delivery, async) -> end; forward(Node, To, Delivery, sync) -> - case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of + case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of {badrpc, Reason} -> ?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]), {error, badrpc}; diff --git a/src/emqx_rpc.erl b/src/emqx_rpc.erl index d577abbdc..02eb614b5 100644 --- a/src/emqx_rpc.erl +++ b/src/emqx_rpc.erl @@ -16,24 +16,41 @@ -module(emqx_rpc). -export([ call/4 + , call/5 , cast/4 + , cast/5 , multicall/4 + , multicall/5 ]). -define(RPC, gen_rpc). +-define(DefaultClientNum, 1). + call(Node, Mod, Fun, Args) -> filter_result(?RPC:call(rpc_node(Node), Mod, Fun, Args)). +call(Key, Node, Mod, Fun, Args) -> + filter_result(?RPC:call(rpc_node({Key, Node}), Mod, Fun, Args)). + multicall(Nodes, Mod, Fun, Args) -> filter_result(?RPC:multicall(rpc_nodes(Nodes), Mod, Fun, Args)). +multicall(Key, Nodes, Mod, Fun, Args) -> + filter_result(?RPC:multicall(rpc_nodes([{Key, Node} || Node <- Nodes]), Mod, Fun, Args)). + cast(Node, Mod, Fun, Args) -> filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)). -rpc_node(Node) -> - ClientNum = application:get_env(gen_rpc, tcp_client_num, 32), - {Node, rand:uniform(ClientNum)}. +cast(Key, Node, Mod, Fun, Args) -> + filter_result(?RPC:cast(rpc_node({Key, Node}), Mod, Fun, Args)). + +rpc_node(Node) when is_atom(Node) -> + ClientNum = application:get_env(gen_rpc, tcp_client_num, ?DefaultClientNum), + {Node, rand:uniform(ClientNum)}; +rpc_node({Key, Node}) when is_atom(Node) -> + ClientNum = application:get_env(gen_rpc, tcp_client_num, ?DefaultClientNum), + {Node, erlang:phash2(Key, ClientNum) + 1}. rpc_nodes(Nodes) -> rpc_nodes(Nodes, []). @@ -43,7 +60,6 @@ rpc_nodes([], Acc) -> rpc_nodes([Node | Nodes], Acc) -> rpc_nodes(Nodes, [rpc_node(Node) | Acc]). - filter_result({Error, Reason}) when Error =:= badrpc; Error =:= badtcp -> {badrpc, Reason};