feature(worker): relay function calls by workers
This commit is contained in:
parent
f15e807f76
commit
dae9633cd5
|
@ -0,0 +1,3 @@
|
|||
-type callback() :: fun((any()) -> any()).
|
||||
-type action() :: fun((pid()) -> any()).
|
||||
-type exec_mode() :: relay | relay_async | {relay_async, callback()} | direct.
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
-module(ecpool).
|
||||
|
||||
-include("ecpool.hrl").
|
||||
|
||||
-export([ pool_spec/4
|
||||
, start_pool/3
|
||||
, start_sup_pool/3
|
||||
|
@ -24,6 +26,7 @@
|
|||
, get_client/2
|
||||
, with_client/2
|
||||
, with_client/3
|
||||
, with_client/4
|
||||
, name/1
|
||||
, workers/1
|
||||
]).
|
||||
|
@ -91,21 +94,33 @@ add_reconnect_callback(Pool, Callback) ->
|
|||
ok.
|
||||
|
||||
%% @doc Call the fun with client/connection
|
||||
-spec(with_client(atom(), fun((Client :: pid()) -> any())) -> no_return()).
|
||||
with_client(Pool, Fun) when is_atom(Pool) ->
|
||||
with_worker(gproc_pool:pick_worker(name(Pool)), Fun).
|
||||
-spec(with_client(atom(), action()) -> any()).
|
||||
with_client(Pool, Action) when is_atom(Pool) ->
|
||||
with_client(Pool, Action, direct).
|
||||
|
||||
%% @doc Call the fun with client/connection
|
||||
-spec(with_client(atom(), any(), fun((Client :: pid()) -> term())) -> no_return()).
|
||||
with_client(Pool, Key, Fun) when is_atom(Pool) ->
|
||||
with_worker(gproc_pool:pick_worker(name(Pool), Key), Fun).
|
||||
-spec(with_client(atom(), action() | term(), action() | exec_mode()) -> any()).
|
||||
with_client(Pool, Key, Action) when is_atom(Pool), is_function(Action) ->
|
||||
with_client(Pool, Key, Action, direct);
|
||||
|
||||
-spec(with_worker(Worker :: pid(), fun((Client :: pid()) -> any())) -> no_return()).
|
||||
with_worker(Worker, Fun) ->
|
||||
with_client(Pool, Action, Mode) when is_atom(Pool), is_function(Action) ->
|
||||
with_worker(gproc_pool:pick_worker(name(Pool)), Action, Mode).
|
||||
|
||||
-spec(with_client(atom(), any(), fun((Client :: pid()) -> term()), exec_mode()) -> any()).
|
||||
with_client(Pool, Key, Action, Mode) when is_atom(Pool) ->
|
||||
with_worker(gproc_pool:pick_worker(name(Pool), Key), Action, Mode).
|
||||
|
||||
-spec with_worker(pid(), action(), exec_mode()) -> any().
|
||||
with_worker(Worker, Action, direct) ->
|
||||
case ecpool_worker:client(Worker) of
|
||||
{ok, Client} -> Fun(Client);
|
||||
{ok, Client} -> Action(Client);
|
||||
{error, Reason} -> {error, Reason}
|
||||
end.
|
||||
end;
|
||||
with_worker(Worker, Action, relay) ->
|
||||
ecpool_worker:exec(Worker, Action);
|
||||
with_worker(Worker, Action, relay_async) ->
|
||||
ecpool_worker:exec_async(Worker, Action);
|
||||
with_worker(Worker, Action, {relay_async, CallbackFun}) ->
|
||||
ecpool_worker:exec_async(Worker, Action, CallbackFun).
|
||||
|
||||
%% @doc Pool workers
|
||||
workers(Pool) ->
|
||||
|
|
|
@ -16,12 +16,17 @@
|
|||
|
||||
-module(ecpool_worker).
|
||||
|
||||
-include("ecpool.hrl").
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-export([start_link/4]).
|
||||
|
||||
%% API Function Exports
|
||||
-export([ client/1
|
||||
, exec/2
|
||||
, exec_async/2
|
||||
, exec_async/3
|
||||
, is_connected/1
|
||||
, set_reconnect_callback/2
|
||||
, add_reconnect_callback/2
|
||||
|
@ -82,6 +87,18 @@ start_link(Pool, Id, Mod, Opts) ->
|
|||
client(Pid) ->
|
||||
gen_server:call(Pid, client, infinity).
|
||||
|
||||
-spec(exec(pid(), action()) -> Result :: any() | {error, Reason :: term()}).
|
||||
exec(Pid, Action) ->
|
||||
gen_server:call(Pid, {exec, Action}, infinity).
|
||||
|
||||
-spec(exec_async(pid(), action()) -> Result :: any() | {error, Reason :: term()}).
|
||||
exec_async(Pid, Action) ->
|
||||
gen_server:call(Pid, {exec_async, Action}).
|
||||
|
||||
-spec(exec_async(pid(), action(), callback()) -> Result :: any() | {error, Reason :: term()}).
|
||||
exec_async(Pid, Action, Callback) ->
|
||||
gen_server:call(Pid, {exec_async, Action, Callback}).
|
||||
|
||||
%% @doc Is client connected?
|
||||
-spec(is_connected(pid()) -> boolean()).
|
||||
is_connected(Pid) ->
|
||||
|
@ -128,6 +145,19 @@ handle_call(client, _From, State = #state{client = undefined}) ->
|
|||
handle_call(client, _From, State = #state{client = Client}) ->
|
||||
{reply, {ok, Client}, State};
|
||||
|
||||
handle_call({exec, Action}, _From, State = #state{client = Client}) ->
|
||||
{reply, safe_exec(Action, Client), State};
|
||||
|
||||
handle_call({exec_async, Action}, From, State = #state{client = Client}) ->
|
||||
gen_server:reply(From, ok),
|
||||
_ = safe_exec(Action, Client),
|
||||
{noreply, State};
|
||||
|
||||
handle_call({exec_async, Action, Callback}, From, State = #state{client = Client}) ->
|
||||
gen_server:reply(From, ok),
|
||||
_ = Callback(safe_exec(Action, Client)),
|
||||
{noreply, State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
logger:error("[PoolWorker] unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
@ -237,3 +267,11 @@ connect_internal(State) ->
|
|||
_C:Reason:ST -> {error, {Reason, ST}}
|
||||
end.
|
||||
|
||||
safe_exec(Action, Client) when is_pid(Client) ->
|
||||
try Action(Client)
|
||||
catch E:R:ST ->
|
||||
logger:error("[PoolWorker] safe_exec failed: ~p", [{E,R,ST}]),
|
||||
{error, {exec_failed, E, R}}
|
||||
end;
|
||||
safe_exec(_Action, undefined) ->
|
||||
{error, worker_disconnected}.
|
||||
|
|
|
@ -49,11 +49,14 @@ groups() ->
|
|||
t_start_sup_pool,
|
||||
t_restart_client,
|
||||
t_reconnect_client,
|
||||
t_client_exec_hash,
|
||||
t_client_exec_random,
|
||||
t_multiprocess_client,
|
||||
t_multiprocess_client_not_restart
|
||||
]}].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
{ok, _} = application:ensure_all_started(gproc),
|
||||
{ok, _} = application:ensure_all_started(ecpool),
|
||||
Config.
|
||||
|
||||
|
@ -161,3 +164,48 @@ t_multiprocess_client_not_restart(_Config) ->
|
|||
[begin
|
||||
?assertEqual(false, ecpool_worker:is_connected(Worker))
|
||||
end || {_WorkerName, Worker} <- ecpool:workers(?POOL)].
|
||||
|
||||
t_client_exec_hash(_Config) ->
|
||||
Opts = [{pool_size, 5}, {pool_type, hash}, {auto_reconnect, false}],
|
||||
{ok, Pid1} = ecpool:start_pool(abc, test_client, Opts),
|
||||
ct:pal("----- pid: ~p", [is_process_alive(Pid1)]),
|
||||
?assertEqual(4, ecpool:with_client(abc, <<"abc">>, fun(Client) ->
|
||||
test_client:plus(Client, 1, 3)
|
||||
end, direct)),
|
||||
From = self(),
|
||||
Callback = fun(Result) -> From ! {result, Result} end,
|
||||
?assertEqual(4, ecpool:with_client(abc, <<"abc">>, fun(Client) ->
|
||||
test_client:plus(Client, 1, 3)
|
||||
end, relay)),
|
||||
?assertEqual(ok, ecpool:with_client(abc, <<"abc">>, fun(Client) ->
|
||||
test_client:plus(Client, 1, 3)
|
||||
end, relay_async)),
|
||||
?assertEqual(ok, ecpool:with_client(abc, <<"abc">>, fun(Client) ->
|
||||
test_client:plus(Client, 1, 3)
|
||||
end, {relay_async, Callback})),
|
||||
receive
|
||||
{result, 4} -> ok;
|
||||
R2 -> ct:fail({unexpected_result, R2})
|
||||
end,
|
||||
ecpool:stop_sup_pool(abc).
|
||||
|
||||
t_client_exec_random(_Config) ->
|
||||
ecpool:start_pool(?POOL, test_client, ?POOL_OPTS),
|
||||
?assertEqual(4, ecpool:with_client(?POOL, fun(Client) ->
|
||||
test_client:plus(Client, 1, 3)
|
||||
end, direct)),
|
||||
?assertEqual(4, ecpool:with_client(?POOL, fun(Client) ->
|
||||
test_client:plus(Client, 1, 3)
|
||||
end, relay)),
|
||||
?assertEqual(ok, ecpool:with_client(?POOL, fun(Client) ->
|
||||
test_client:plus(Client, 1, 3)
|
||||
end, relay_async)),
|
||||
From = self(),
|
||||
Callback = fun(Result) -> From ! {result, Result} end,
|
||||
?assertEqual(ok, ecpool:with_client(?POOL, fun(Client) ->
|
||||
test_client:plus(Client, 1, 3)
|
||||
end, {relay_async, Callback})),
|
||||
receive
|
||||
{result, 4} -> ok;
|
||||
R1 -> ct:fail({unexpected_result, R1})
|
||||
end.
|
||||
|
|
|
@ -22,8 +22,9 @@
|
|||
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
-export([connect/1,
|
||||
stop/2
|
||||
-export([ connect/1
|
||||
, plus/3
|
||||
, stop/2
|
||||
]).
|
||||
|
||||
-export([init/1,
|
||||
|
@ -44,6 +45,9 @@ connect(Opts) ->
|
|||
gen_server:start_link(?MODULE, [Opts], [])
|
||||
end.
|
||||
|
||||
plus(Pid, L, R) ->
|
||||
gen_server:call(Pid, {plus, L, R}).
|
||||
|
||||
stop(Pid, Reason) ->
|
||||
gen_server:call(Pid, {stop, Reason}).
|
||||
|
||||
|
@ -57,6 +61,9 @@ init(Args) ->
|
|||
handle_call({stop, Reason}, _From, State) ->
|
||||
{stop, Reason, ok, State};
|
||||
|
||||
handle_call({plus, L, R}, _From, State) ->
|
||||
{reply, L + R, State};
|
||||
|
||||
handle_call(_Req, _From, State) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
|
|
Loading…
Reference in New Issue