Ordered messaging via multiple gen_rpc clients
This commit is contained in:
parent
7124a40f3d
commit
b47a4f4422
|
@ -256,7 +256,7 @@ aggre(Routes) ->
|
||||||
-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async)
|
-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async)
|
||||||
-> emqx_types:deliver_result()).
|
-> emqx_types:deliver_result()).
|
||||||
forward(Node, To, Delivery, async) ->
|
forward(Node, To, Delivery, async) ->
|
||||||
case emqx_rpc:cast(Node, ?BROKER, dispatch, [To, Delivery]) of
|
case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
{badrpc, Reason} ->
|
{badrpc, Reason} ->
|
||||||
?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]),
|
?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]),
|
||||||
|
@ -264,7 +264,7 @@ forward(Node, To, Delivery, async) ->
|
||||||
end;
|
end;
|
||||||
|
|
||||||
forward(Node, To, Delivery, sync) ->
|
forward(Node, To, Delivery, sync) ->
|
||||||
case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of
|
case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of
|
||||||
{badrpc, Reason} ->
|
{badrpc, Reason} ->
|
||||||
?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]),
|
?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]),
|
||||||
{error, badrpc};
|
{error, badrpc};
|
||||||
|
|
|
@ -16,24 +16,41 @@
|
||||||
-module(emqx_rpc).
|
-module(emqx_rpc).
|
||||||
|
|
||||||
-export([ call/4
|
-export([ call/4
|
||||||
|
, call/5
|
||||||
, cast/4
|
, cast/4
|
||||||
|
, cast/5
|
||||||
, multicall/4
|
, multicall/4
|
||||||
|
, multicall/5
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(RPC, gen_rpc).
|
-define(RPC, gen_rpc).
|
||||||
|
|
||||||
|
-define(DefaultClientNum, 1).
|
||||||
|
|
||||||
call(Node, Mod, Fun, Args) ->
|
call(Node, Mod, Fun, Args) ->
|
||||||
filter_result(?RPC:call(rpc_node(Node), Mod, Fun, Args)).
|
filter_result(?RPC:call(rpc_node(Node), Mod, Fun, Args)).
|
||||||
|
|
||||||
|
call(Key, Node, Mod, Fun, Args) ->
|
||||||
|
filter_result(?RPC:call(rpc_node({Key, Node}), Mod, Fun, Args)).
|
||||||
|
|
||||||
multicall(Nodes, Mod, Fun, Args) ->
|
multicall(Nodes, Mod, Fun, Args) ->
|
||||||
filter_result(?RPC:multicall(rpc_nodes(Nodes), Mod, Fun, Args)).
|
filter_result(?RPC:multicall(rpc_nodes(Nodes), Mod, Fun, Args)).
|
||||||
|
|
||||||
|
multicall(Key, Nodes, Mod, Fun, Args) ->
|
||||||
|
filter_result(?RPC:multicall(rpc_nodes([{Key, Node} || Node <- Nodes]), Mod, Fun, Args)).
|
||||||
|
|
||||||
cast(Node, Mod, Fun, Args) ->
|
cast(Node, Mod, Fun, Args) ->
|
||||||
filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)).
|
filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)).
|
||||||
|
|
||||||
rpc_node(Node) ->
|
cast(Key, Node, Mod, Fun, Args) ->
|
||||||
ClientNum = application:get_env(gen_rpc, tcp_client_num, 32),
|
filter_result(?RPC:cast(rpc_node({Key, Node}), Mod, Fun, Args)).
|
||||||
{Node, rand:uniform(ClientNum)}.
|
|
||||||
|
rpc_node(Node) when is_atom(Node) ->
|
||||||
|
ClientNum = application:get_env(gen_rpc, tcp_client_num, ?DefaultClientNum),
|
||||||
|
{Node, rand:uniform(ClientNum)};
|
||||||
|
rpc_node({Key, Node}) when is_atom(Node) ->
|
||||||
|
ClientNum = application:get_env(gen_rpc, tcp_client_num, ?DefaultClientNum),
|
||||||
|
{Node, erlang:phash2(Key, ClientNum) + 1}.
|
||||||
|
|
||||||
rpc_nodes(Nodes) ->
|
rpc_nodes(Nodes) ->
|
||||||
rpc_nodes(Nodes, []).
|
rpc_nodes(Nodes, []).
|
||||||
|
@ -43,7 +60,6 @@ rpc_nodes([], Acc) ->
|
||||||
rpc_nodes([Node | Nodes], Acc) ->
|
rpc_nodes([Node | Nodes], Acc) ->
|
||||||
rpc_nodes(Nodes, [rpc_node(Node) | Acc]).
|
rpc_nodes(Nodes, [rpc_node(Node) | Acc]).
|
||||||
|
|
||||||
|
|
||||||
filter_result({Error, Reason})
|
filter_result({Error, Reason})
|
||||||
when Error =:= badrpc; Error =:= badtcp ->
|
when Error =:= badrpc; Error =:= badtcp ->
|
||||||
{badrpc, Reason};
|
{badrpc, Reason};
|
||||||
|
|
Loading…
Reference in New Issue