diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 8b6693173..951606183 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -258,7 +258,7 @@ aggre(Routes) -> -spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async) -> emqx_types:deliver_result()). forward(Node, To, Delivery, async) -> - case emqx_rpc:cast(Node, ?BROKER, dispatch, [To, Delivery]) of + case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of true -> ok; {badrpc, Reason} -> ?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]), @@ -266,7 +266,7 @@ forward(Node, To, Delivery, async) -> end; forward(Node, To, Delivery, sync) -> - case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of + case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of {badrpc, Reason} -> ?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]), {error, badrpc}; diff --git a/src/emqx_rpc.erl b/src/emqx_rpc.erl index 6af676802..9eb333a7f 100644 --- a/src/emqx_rpc.erl +++ b/src/emqx_rpc.erl @@ -18,8 +18,11 @@ -module(emqx_rpc). -export([ call/4 + , call/5 , cast/4 + , cast/5 , multicall/4 + , multicall/5 ]). -compile({inline, @@ -34,15 +37,27 @@ call(Node, Mod, Fun, Args) -> filter_result(?RPC:call(rpc_node(Node), Mod, Fun, Args)). +call(Key, Node, Mod, Fun, Args) -> + filter_result(?RPC:call(rpc_node({Key, Node}), Mod, Fun, Args)). + multicall(Nodes, Mod, Fun, Args) -> filter_result(?RPC:multicall(rpc_nodes(Nodes), Mod, Fun, Args)). +multicall(Key, Nodes, Mod, Fun, Args) -> + filter_result(?RPC:multicall(rpc_nodes([{Key, Node} || Node <- Nodes]), Mod, Fun, Args)). + cast(Node, Mod, Fun, Args) -> filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)). -rpc_node(Node) -> +cast(Key, Node, Mod, Fun, Args) -> + filter_result(?RPC:cast(rpc_node({Key, Node}), Mod, Fun, Args)). + +rpc_node(Node) when is_atom(Node) -> ClientNum = application:get_env(gen_rpc, tcp_client_num, ?DefaultClientNum), - {Node, rand:uniform(ClientNum)}. + {Node, rand:uniform(ClientNum)}; +rpc_node({Key, Node}) when is_atom(Node) -> + ClientNum = application:get_env(gen_rpc, tcp_client_num, ?DefaultClientNum), + {Node, erlang:phash2(Key, ClientNum) + 1}. rpc_nodes(Nodes) -> rpc_nodes(Nodes, []). diff --git a/test/emqx_rpc_SUITE.erl b/test/emqx_rpc_SUITE.erl index 99c5c2532..6b24e3de0 100644 --- a/test/emqx_rpc_SUITE.erl +++ b/test/emqx_rpc_SUITE.erl @@ -24,17 +24,6 @@ all() -> emqx_ct:all(?MODULE). -t_multicall(_) -> - error('TODO'). - -t_cast(_) -> - error('TODO'). - -t_call(_) -> - error('TODO'). - - - t_prop_rpc(_) -> ok = load(), Opts = [{to_file, user}, {numtests, 10}], @@ -42,7 +31,9 @@ t_prop_rpc(_) -> ok = application:set_env(gen_rpc, call_receive_timeout, 1), ok = emqx_logger:set_log_level(emergency), ?assert(proper:quickcheck(prop_node(), Opts)), + ?assert(proper:quickcheck(prop_node_with_key(), Opts)), ?assert(proper:quickcheck(prop_nodes(), Opts)), + ?assert(proper:quickcheck(prop_nodes_with_key(), Opts)), ok = application:stop(gen_rpc), ok = unload(). @@ -57,6 +48,17 @@ prop_node() -> end end). +prop_node_with_key() -> + ?FORALL({Node, Key}, nodename_with_key(), + begin + ?assert(emqx_rpc:cast(Key, Node, erlang, system_time, [])), + case emqx_rpc:call(Key, Node, erlang, system_time, []) of + {badrpc, _Reason} -> true; + Delivery when is_integer(Delivery) -> true; + _Other -> false + end + end). + prop_nodes() -> ?FORALL(Nodes, nodesname(), begin @@ -70,6 +72,19 @@ prop_nodes() -> end end). +prop_nodes_with_key() -> + ?FORALL({Nodes, Key}, nodesname_with_key(), + begin + case emqx_rpc:multicall(Key, Nodes, erlang, system_time, []) of + {badrpc, _Reason} -> true; + {RealResults, RealBadNodes} + when is_list(RealResults); + is_list(RealBadNodes) -> + true; + _Other -> false + end + end). + %%-------------------------------------------------------------------- %% helper %%-------------------------------------------------------------------- @@ -96,8 +111,19 @@ nodename() -> list_to_atom(Node) end). +nodename_with_key() -> + ?LET({NodePrefix, HostName, Key}, + {node_prefix(), hostname(), choose(0, 10)}, + begin + Node = NodePrefix ++ "@" ++ HostName, + {list_to_atom(Node), Key} + end). + nodesname() -> - oneof([list(nodename()), ["emqxct@127.0.0.1"]]). + oneof([list(nodename()), ['emqxct@127.0.0.1']]). + +nodesname_with_key() -> + oneof([{list(nodename()), choose(0, 10)}, {['emqxct@127.0.0.1'], 1}]). node_prefix() -> oneof(["emqxct", text_like()]).