refactor(with_client): change with_client/2,3 to pick_and_do/3 (#24)
* refactor(with_client): change with_client/2,3 to pick_and_do/3 Change the actions and callbacks to type {M, F, A} to avoid badfun when hot upgrading modules.
This commit is contained in:
parent
aa5b97ec44
commit
47a223fadc
|
@ -1,3 +1,16 @@
|
||||||
-type callback() :: fun((any()) -> any()).
|
-type callback() :: mfa() | fun((any()) -> any()).
|
||||||
-type action() :: fun((pid()) -> any()).
|
-type action() :: mfa() | fun((pid()) -> any()).
|
||||||
-type exec_mode() :: relay | relay_async | {relay_async, callback()} | direct.
|
-type apply_mode() :: handover
|
||||||
|
| handover_async
|
||||||
|
| {handover, timeout()}
|
||||||
|
| {handover_async, callback()}
|
||||||
|
| no_handover.
|
||||||
|
-type pool_type() :: random | hash | direct | round_robin.
|
||||||
|
-type pool_name() :: term().
|
||||||
|
-type conn_callback() :: mfa().
|
||||||
|
-type option() :: {pool_size, pos_integer()}
|
||||||
|
| {pool_type, pool_type()}
|
||||||
|
| {auto_reconnect, false | pos_integer()}
|
||||||
|
| {on_reconnect, conn_callback()}
|
||||||
|
| {on_disconnect, conn_callback()}
|
||||||
|
| tuple().
|
||||||
|
|
|
@ -24,31 +24,18 @@
|
||||||
, stop_sup_pool/1
|
, stop_sup_pool/1
|
||||||
, get_client/1
|
, get_client/1
|
||||||
, get_client/2
|
, get_client/2
|
||||||
, with_client/2
|
, pick_and_do/3
|
||||||
, with_client/3
|
|
||||||
, with_client/4
|
|
||||||
, name/1
|
, name/1
|
||||||
, workers/1
|
, workers/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([set_reconnect_callback/2, add_reconnect_callback/2]).
|
-export([set_reconnect_callback/2, add_reconnect_callback/2]).
|
||||||
|
|
||||||
-export_type([ pool_name/0
|
%% NOTE: Obsolete APIs.
|
||||||
, pool_type/0
|
%% Use pick_and_do/3 APIs instead
|
||||||
, option/0
|
-export([ with_client/2
|
||||||
]).
|
, with_client/3
|
||||||
|
]).
|
||||||
-type(pool_name() :: term()).
|
|
||||||
|
|
||||||
-type(pool_type() :: random | hash | round_robin).
|
|
||||||
|
|
||||||
-type(reconn_callback() :: {fun((pid()) -> term())}).
|
|
||||||
|
|
||||||
-type(option() :: {pool_size, pos_integer()}
|
|
||||||
| {pool_type, pool_type()}
|
|
||||||
| {auto_reconnect, false | pos_integer()}
|
|
||||||
| {on_reconnect, reconn_callback()}
|
|
||||||
| tuple()).
|
|
||||||
|
|
||||||
pool_spec(ChildId, Pool, Mod, Opts) ->
|
pool_spec(ChildId, Pool, Mod, Opts) ->
|
||||||
#{id => ChildId,
|
#{id => ChildId,
|
||||||
|
@ -81,45 +68,51 @@ get_client(Pool) ->
|
||||||
get_client(Pool, Key) ->
|
get_client(Pool, Key) ->
|
||||||
gproc_pool:pick_worker(name(Pool), Key).
|
gproc_pool:pick_worker(name(Pool), Key).
|
||||||
|
|
||||||
-spec(set_reconnect_callback(atom(), reconn_callback()) -> ok).
|
-spec(set_reconnect_callback(atom(), conn_callback()) -> ok).
|
||||||
set_reconnect_callback(Pool, Callback) ->
|
set_reconnect_callback(Pool, Callback) ->
|
||||||
[ecpool_worker:set_reconnect_callback(Worker, Callback)
|
[ecpool_worker:set_reconnect_callback(Worker, Callback)
|
||||||
|| {_WorkerName, Worker} <- ecpool:workers(Pool)],
|
|| {_WorkerName, Worker} <- ecpool:workers(Pool)],
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec(add_reconnect_callback(atom(), reconn_callback()) -> ok).
|
-spec(add_reconnect_callback(atom(), conn_callback()) -> ok).
|
||||||
add_reconnect_callback(Pool, Callback) ->
|
add_reconnect_callback(Pool, Callback) ->
|
||||||
[ecpool_worker:add_reconnect_callback(Worker, Callback)
|
[ecpool_worker:add_reconnect_callback(Worker, Callback)
|
||||||
|| {_WorkerName, Worker} <- ecpool:workers(Pool)],
|
|| {_WorkerName, Worker} <- ecpool:workers(Pool)],
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% NOTE: Use pick_and_do/3 instead of with_client/2,3
|
||||||
|
%% to avoid applying action failure with 'badfun'.
|
||||||
|
%%
|
||||||
%% @doc Call the fun with client/connection
|
%% @doc Call the fun with client/connection
|
||||||
-spec(with_client(atom(), action()) -> any()).
|
-spec(with_client(atom(), fun((Client :: pid()) -> any())) -> no_return()).
|
||||||
with_client(Pool, Action) when is_atom(Pool) ->
|
with_client(Pool, Fun) when is_atom(Pool) ->
|
||||||
with_client(Pool, Action, direct).
|
with_worker(gproc_pool:pick_worker(name(Pool)), Fun, no_handover).
|
||||||
|
|
||||||
-spec(with_client(atom(), action() | term(), action() | exec_mode()) -> any()).
|
%% @doc Call the fun with client/connection
|
||||||
with_client(Pool, Key, Action) when is_atom(Pool), is_function(Action) ->
|
-spec(with_client(atom(), any(), fun((Client :: pid()) -> term())) -> no_return()).
|
||||||
with_client(Pool, Key, Action, direct);
|
with_client(Pool, Key, Fun) when is_atom(Pool) ->
|
||||||
|
with_worker(gproc_pool:pick_worker(name(Pool), Key), Fun, no_handover).
|
||||||
|
|
||||||
with_client(Pool, Action, Mode) when is_atom(Pool), is_function(Action) ->
|
-spec pick_and_do({atom(), term()}, mfa(), apply_mode()) -> any().
|
||||||
with_worker(gproc_pool:pick_worker(name(Pool)), Action, Mode).
|
pick_and_do({Pool, KeyOrNum}, Action = {_,_,_}, ApplyMode) ->
|
||||||
|
with_worker(gproc_pool:pick_worker(name(Pool), KeyOrNum), Action, ApplyMode);
|
||||||
|
|
||||||
-spec(with_client(atom(), any(), fun((Client :: pid()) -> term()), exec_mode()) -> any()).
|
pick_and_do(Pool, Action = {_,_,_}, ApplyMode) ->
|
||||||
with_client(Pool, Key, Action, Mode) when is_atom(Pool) ->
|
with_worker(gproc_pool:pick_worker(name(Pool)), Action, ApplyMode).
|
||||||
with_worker(gproc_pool:pick_worker(name(Pool), Key), Action, Mode).
|
|
||||||
|
|
||||||
-spec with_worker(pid(), action(), exec_mode()) -> any().
|
-spec with_worker(pid(), action(), apply_mode()) -> any().
|
||||||
with_worker(Worker, Action, direct) ->
|
with_worker(Worker, Action, no_handover) ->
|
||||||
case ecpool_worker:client(Worker) of
|
case ecpool_worker:client(Worker) of
|
||||||
{ok, Client} -> Action(Client);
|
{ok, Client} -> exec(Action, Client);
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, Reason}
|
||||||
end;
|
end;
|
||||||
with_worker(Worker, Action, relay) ->
|
with_worker(Worker, Action, handover) ->
|
||||||
ecpool_worker:exec(Worker, Action);
|
ecpool_worker:exec(Worker, Action, infinity);
|
||||||
with_worker(Worker, Action, relay_async) ->
|
with_worker(Worker, Action, {handover, Timeout}) when is_integer(Timeout) ->
|
||||||
|
ecpool_worker:exec(Worker, Action, Timeout);
|
||||||
|
with_worker(Worker, Action, handover_async) ->
|
||||||
ecpool_worker:exec_async(Worker, Action);
|
ecpool_worker:exec_async(Worker, Action);
|
||||||
with_worker(Worker, Action, {relay_async, CallbackFun}) ->
|
with_worker(Worker, Action, {handover_async, CallbackFun = {_,_,_}}) ->
|
||||||
ecpool_worker:exec_async(Worker, Action, CallbackFun).
|
ecpool_worker:exec_async(Worker, Action, CallbackFun).
|
||||||
|
|
||||||
%% @doc Pool workers
|
%% @doc Pool workers
|
||||||
|
@ -129,3 +122,7 @@ workers(Pool) ->
|
||||||
%% @doc ecpool name
|
%% @doc ecpool name
|
||||||
name(Pool) -> {?MODULE, Pool}.
|
name(Pool) -> {?MODULE, Pool}.
|
||||||
|
|
||||||
|
exec({M, F, A}, Client) ->
|
||||||
|
erlang:apply(M, F, [Client]++A);
|
||||||
|
exec(Action, Client) when is_function(Action) ->
|
||||||
|
Action(Client).
|
||||||
|
|
|
@ -24,12 +24,14 @@
|
||||||
|
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
-export([ client/1
|
-export([ client/1
|
||||||
, exec/2
|
, exec/3
|
||||||
, exec_async/2
|
, exec_async/2
|
||||||
, exec_async/3
|
, exec_async/3
|
||||||
, is_connected/1
|
, is_connected/1
|
||||||
, set_reconnect_callback/2
|
, set_reconnect_callback/2
|
||||||
|
, set_disconnect_callback/2
|
||||||
, add_reconnect_callback/2
|
, add_reconnect_callback/2
|
||||||
|
, add_disconnect_callback/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% gen_server Function Exports
|
%% gen_server Function Exports
|
||||||
|
@ -46,8 +48,8 @@
|
||||||
id :: pos_integer(),
|
id :: pos_integer(),
|
||||||
client :: pid() | undefined,
|
client :: pid() | undefined,
|
||||||
mod :: module(),
|
mod :: module(),
|
||||||
on_reconnect :: ecpool:reconn_callback(),
|
on_reconnect :: ecpool:conn_callback(),
|
||||||
on_disconnect :: ecpool:reconn_callback(),
|
on_disconnect :: ecpool:conn_callback(),
|
||||||
supervisees = [],
|
supervisees = [],
|
||||||
opts :: proplists:proplist()
|
opts :: proplists:proplist()
|
||||||
}).
|
}).
|
||||||
|
@ -74,31 +76,39 @@ start_link(Pool, Id, Mod, Opts) ->
|
||||||
client(Pid) ->
|
client(Pid) ->
|
||||||
gen_server:call(Pid, client, infinity).
|
gen_server:call(Pid, client, infinity).
|
||||||
|
|
||||||
-spec(exec(pid(), action()) -> Result :: any() | {error, Reason :: term()}).
|
-spec(exec(pid(), action(), timeout()) -> Result :: any() | {error, Reason :: term()}).
|
||||||
exec(Pid, Action) ->
|
exec(Pid, Action, Timeout) ->
|
||||||
gen_server:call(Pid, {exec, Action}, infinity).
|
gen_server:call(Pid, {exec, Action}, Timeout).
|
||||||
|
|
||||||
-spec(exec_async(pid(), action()) -> Result :: any() | {error, Reason :: term()}).
|
-spec exec_async(pid(), action()) -> ok.
|
||||||
exec_async(Pid, Action) ->
|
exec_async(Pid, Action) ->
|
||||||
gen_server:call(Pid, {exec_async, Action}).
|
gen_server:cast(Pid, {exec_async, Action}).
|
||||||
|
|
||||||
-spec(exec_async(pid(), action(), callback()) -> Result :: any() | {error, Reason :: term()}).
|
-spec exec_async(pid(), action(), callback()) -> ok.
|
||||||
exec_async(Pid, Action, Callback) ->
|
exec_async(Pid, Action, Callback) ->
|
||||||
gen_server:call(Pid, {exec_async, Action, Callback}).
|
gen_server:cast(Pid, {exec_async, Action, Callback}).
|
||||||
|
|
||||||
%% @doc Is client connected?
|
%% @doc Is client connected?
|
||||||
-spec(is_connected(pid()) -> boolean()).
|
-spec(is_connected(pid()) -> boolean()).
|
||||||
is_connected(Pid) ->
|
is_connected(Pid) ->
|
||||||
gen_server:call(Pid, is_connected, infinity).
|
gen_server:call(Pid, is_connected, infinity).
|
||||||
|
|
||||||
-spec(set_reconnect_callback(pid(), ecpool:reconn_callback()) -> ok).
|
-spec(set_reconnect_callback(pid(), ecpool:conn_callback()) -> ok).
|
||||||
set_reconnect_callback(Pid, OnReconnect) ->
|
set_reconnect_callback(Pid, OnReconnect = {_, _, _}) ->
|
||||||
gen_server:cast(Pid, {set_reconn_callbk, OnReconnect}).
|
gen_server:cast(Pid, {set_reconn_callbk, OnReconnect}).
|
||||||
|
|
||||||
-spec(add_reconnect_callback(pid(), ecpool:reconn_callback()) -> ok).
|
-spec(set_disconnect_callback(pid(), ecpool:conn_callback()) -> ok).
|
||||||
add_reconnect_callback(Pid, OnReconnect) ->
|
set_disconnect_callback(Pid, OnDisconnect = {_, _, _}) ->
|
||||||
|
gen_server:cast(Pid, {set_disconn_callbk, OnDisconnect}).
|
||||||
|
|
||||||
|
-spec(add_reconnect_callback(pid(), ecpool:conn_callback()) -> ok).
|
||||||
|
add_reconnect_callback(Pid, OnReconnect = {_, _, _}) ->
|
||||||
gen_server:cast(Pid, {add_reconn_callbk, OnReconnect}).
|
gen_server:cast(Pid, {add_reconn_callbk, OnReconnect}).
|
||||||
|
|
||||||
|
-spec(add_disconnect_callback(pid(), ecpool:conn_callback()) -> ok).
|
||||||
|
add_disconnect_callback(Pid, OnDisconnect = {_, _, _}) ->
|
||||||
|
gen_server:cast(Pid, {add_disconn_callbk, OnDisconnect}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -109,8 +119,8 @@ init([Pool, Id, Mod, Opts]) ->
|
||||||
id = Id,
|
id = Id,
|
||||||
mod = Mod,
|
mod = Mod,
|
||||||
opts = Opts,
|
opts = Opts,
|
||||||
on_reconnect = proplists:get_value(on_reconnect, Opts),
|
on_reconnect = ensure_callback(proplists:get_value(on_reconnect, Opts)),
|
||||||
on_disconnect = proplists:get_value(on_disconnect, Opts)
|
on_disconnect = ensure_callback(proplists:get_value(on_disconnect, Opts))
|
||||||
},
|
},
|
||||||
case connect_internal(State) of
|
case connect_internal(State) of
|
||||||
{ok, NewState} ->
|
{ok, NewState} ->
|
||||||
|
@ -135,31 +145,29 @@ handle_call(client, _From, State = #state{client = Client}) ->
|
||||||
handle_call({exec, Action}, _From, State = #state{client = Client}) ->
|
handle_call({exec, Action}, _From, State = #state{client = Client}) ->
|
||||||
{reply, safe_exec(Action, Client), State};
|
{reply, safe_exec(Action, Client), State};
|
||||||
|
|
||||||
handle_call({exec_async, Action}, From, State = #state{client = Client}) ->
|
|
||||||
gen_server:reply(From, ok),
|
|
||||||
_ = safe_exec(Action, Client),
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_call({exec_async, Action, Callback}, From, State = #state{client = Client}) ->
|
|
||||||
gen_server:reply(From, ok),
|
|
||||||
_ = Callback(safe_exec(Action, Client)),
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
logger:error("[PoolWorker] unexpected call: ~p", [Req]),
|
logger:error("[PoolWorker] unexpected call: ~p", [Req]),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
|
handle_cast({exec_async, Action}, State = #state{client = Client}) ->
|
||||||
|
_ = safe_exec(Action, Client),
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
|
handle_cast({exec_async, Action, Callback}, State = #state{client = Client}) ->
|
||||||
|
_ = safe_exec(Callback, safe_exec(Action, Client)),
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
handle_cast({set_reconn_callbk, OnReconnect}, State) ->
|
handle_cast({set_reconn_callbk, OnReconnect}, State) ->
|
||||||
{noreply, State#state{on_reconnect = OnReconnect}};
|
{noreply, State#state{on_reconnect = OnReconnect}};
|
||||||
|
|
||||||
handle_cast({add_reconn_callbk, OnReconnect}, State = #state{on_reconnect = OnReconnectList}) when is_list(OnReconnectList) ->
|
handle_cast({set_disconn_callbk, OnDisconnect}, State) ->
|
||||||
{noreply, State#state{on_reconnect = [OnReconnect | OnReconnectList]}};
|
{noreply, State#state{on_disconnect = OnDisconnect}};
|
||||||
|
|
||||||
handle_cast({add_reconn_callbk, OnReconnect}, State = #state{on_reconnect = undefined}) ->
|
handle_cast({add_reconn_callbk, OnReconnect}, State = #state{on_reconnect = OldOnReconnect}) ->
|
||||||
{noreply, State#state{on_reconnect = [OnReconnect]}};
|
{noreply, State#state{on_reconnect = add_conn_callback(OnReconnect, OldOnReconnect)}};
|
||||||
|
|
||||||
handle_cast({add_reconn_callbk, OnReconnect}, State = #state{on_reconnect = OnReconnect0}) ->
|
handle_cast({add_disconn_callbk, OnDisconnect}, State = #state{on_disconnect = OldOnDisconnect}) ->
|
||||||
{noreply, State#state{on_reconnect = [OnReconnect, OnReconnect0]}};
|
{noreply, State#state{on_disconnect = add_conn_callback(OnDisconnect, OldOnDisconnect)}};
|
||||||
|
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -229,16 +237,16 @@ handle_reconnect(_, undefined) ->
|
||||||
handle_reconnect(undefined, _) ->
|
handle_reconnect(undefined, _) ->
|
||||||
ok;
|
ok;
|
||||||
handle_reconnect(Client, OnReconnectList) when is_list(OnReconnectList) ->
|
handle_reconnect(Client, OnReconnectList) when is_list(OnReconnectList) ->
|
||||||
[OnReconnect(Client) || OnReconnect <- OnReconnectList];
|
[safe_exec(OnReconnect, Client) || OnReconnect <- OnReconnectList];
|
||||||
handle_reconnect(Client, OnReconnect) ->
|
handle_reconnect(Client, OnReconnect) ->
|
||||||
OnReconnect(Client).
|
safe_exec(OnReconnect, Client).
|
||||||
|
|
||||||
handle_disconnect(undefined, _) ->
|
handle_disconnect(undefined, _) ->
|
||||||
ok;
|
ok;
|
||||||
handle_disconnect(_, undefined) ->
|
handle_disconnect(_, undefined) ->
|
||||||
ok;
|
ok;
|
||||||
handle_disconnect(Client, Disconnect) ->
|
handle_disconnect(Client, Disconnect) ->
|
||||||
Disconnect(Client).
|
safe_exec(Disconnect, Client).
|
||||||
|
|
||||||
connect_internal(State) ->
|
connect_internal(State) ->
|
||||||
try connect(State) of
|
try connect(State) of
|
||||||
|
@ -254,11 +262,24 @@ connect_internal(State) ->
|
||||||
_C:Reason:ST -> {error, {Reason, ST}}
|
_C:Reason:ST -> {error, {Reason, ST}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
safe_exec(Action, Client) when is_pid(Client) ->
|
safe_exec(Action, MainArg) ->
|
||||||
try Action(Client)
|
try exec(Action, MainArg)
|
||||||
catch E:R:ST ->
|
catch E:R:ST ->
|
||||||
logger:error("[PoolWorker] safe_exec failed: ~p", [{E,R,ST}]),
|
logger:error("[PoolWorker] safe_exec ~p, failed: ~0p", [Action, {E,R,ST}]),
|
||||||
{error, {exec_failed, E, R}}
|
{error, {exec_failed, E, R}}
|
||||||
end;
|
end.
|
||||||
safe_exec(_Action, undefined) ->
|
|
||||||
{error, worker_disconnected}.
|
exec({M, F, A}, MainArg) ->
|
||||||
|
erlang:apply(M, F, [MainArg]++A);
|
||||||
|
exec(Action, MainArg) when is_function(Action) ->
|
||||||
|
Action(MainArg).
|
||||||
|
|
||||||
|
ensure_callback(undefined) -> undefined;
|
||||||
|
ensure_callback({_,_,_} = Callback) -> Callback.
|
||||||
|
|
||||||
|
add_conn_callback(OnReconnect, OldOnReconnects) when is_list(OldOnReconnects) ->
|
||||||
|
[OnReconnect | OldOnReconnects];
|
||||||
|
add_conn_callback(OnReconnect, undefined) ->
|
||||||
|
[OnReconnect];
|
||||||
|
add_conn_callback(OnReconnect, OldOnReconnect) ->
|
||||||
|
[OnReconnect, OldOnReconnect].
|
||||||
|
|
|
@ -50,7 +50,9 @@ groups() ->
|
||||||
t_restart_client,
|
t_restart_client,
|
||||||
t_reconnect_client,
|
t_reconnect_client,
|
||||||
t_client_exec_hash,
|
t_client_exec_hash,
|
||||||
|
t_client_exec2_hash,
|
||||||
t_client_exec_random,
|
t_client_exec_random,
|
||||||
|
t_client_exec2_random,
|
||||||
t_multiprocess_client,
|
t_multiprocess_client,
|
||||||
t_multiprocess_client_not_restart
|
t_multiprocess_client_not_restart
|
||||||
]}].
|
]}].
|
||||||
|
@ -171,18 +173,23 @@ t_client_exec_hash(_Config) ->
|
||||||
ct:pal("----- pid: ~p", [is_process_alive(Pid1)]),
|
ct:pal("----- pid: ~p", [is_process_alive(Pid1)]),
|
||||||
?assertEqual(4, ecpool:with_client(abc, <<"abc">>, fun(Client) ->
|
?assertEqual(4, ecpool:with_client(abc, <<"abc">>, fun(Client) ->
|
||||||
test_client:plus(Client, 1, 3)
|
test_client:plus(Client, 1, 3)
|
||||||
end, direct)),
|
end)),
|
||||||
From = self(),
|
?assertEqual(4, ecpool:with_client(abc, 2, fun(Client) ->
|
||||||
Callback = fun(Result) -> From ! {result, Result} end,
|
|
||||||
?assertEqual(4, ecpool:with_client(abc, <<"abc">>, fun(Client) ->
|
|
||||||
test_client:plus(Client, 1, 3)
|
test_client:plus(Client, 1, 3)
|
||||||
end, relay)),
|
end)),
|
||||||
?assertEqual(ok, ecpool:with_client(abc, <<"abc">>, fun(Client) ->
|
ecpool:stop_sup_pool(abc).
|
||||||
test_client:plus(Client, 1, 3)
|
|
||||||
end, relay_async)),
|
t_client_exec2_hash(_Config) ->
|
||||||
?assertEqual(ok, ecpool:with_client(abc, <<"abc">>, fun(Client) ->
|
Action = {test_client, plus, [1,3]},
|
||||||
test_client:plus(Client, 1, 3)
|
Opts = [{pool_size, 5}, {pool_type, hash}, {auto_reconnect, false}],
|
||||||
end, {relay_async, Callback})),
|
{ok, Pid1} = ecpool:start_pool(abc, test_client, Opts),
|
||||||
|
ct:pal("----- pid: ~p", [is_process_alive(Pid1)]),
|
||||||
|
?assertEqual(4, ecpool:pick_and_do({abc, <<"abc">>}, Action, no_handover)),
|
||||||
|
?assertEqual(4, ecpool:pick_and_do({abc, 3}, Action, no_handover)),
|
||||||
|
Callback = {test_client, callback, [self()]},
|
||||||
|
?assertEqual(4, ecpool:pick_and_do({abc, <<"abc">>}, Action, handover)),
|
||||||
|
?assertEqual(ok, ecpool:pick_and_do({abc, 1}, Action, handover_async)),
|
||||||
|
?assertEqual(ok, ecpool:pick_and_do({abc, <<"abc">>}, Action, {handover_async, Callback})),
|
||||||
receive
|
receive
|
||||||
{result, 4} -> ok;
|
{result, 4} -> ok;
|
||||||
R2 -> ct:fail({unexpected_result, R2})
|
R2 -> ct:fail({unexpected_result, R2})
|
||||||
|
@ -193,19 +200,17 @@ t_client_exec_random(_Config) ->
|
||||||
ecpool:start_pool(?POOL, test_client, ?POOL_OPTS),
|
ecpool:start_pool(?POOL, test_client, ?POOL_OPTS),
|
||||||
?assertEqual(4, ecpool:with_client(?POOL, fun(Client) ->
|
?assertEqual(4, ecpool:with_client(?POOL, fun(Client) ->
|
||||||
test_client:plus(Client, 1, 3)
|
test_client:plus(Client, 1, 3)
|
||||||
end, direct)),
|
end)).
|
||||||
?assertEqual(4, ecpool:with_client(?POOL, fun(Client) ->
|
|
||||||
test_client:plus(Client, 1, 3)
|
t_client_exec2_random(_Config) ->
|
||||||
end, relay)),
|
Action = {test_client, plus, [1,3]},
|
||||||
?assertEqual(ok, ecpool:with_client(?POOL, fun(Client) ->
|
ecpool:start_pool(?POOL, test_client, ?POOL_OPTS),
|
||||||
test_client:plus(Client, 1, 3)
|
?assertEqual(4, ecpool:pick_and_do(?POOL, Action, no_handover)),
|
||||||
end, relay_async)),
|
?assertEqual(4, ecpool:pick_and_do(?POOL, Action, handover)),
|
||||||
From = self(),
|
?assertEqual(ok, ecpool:pick_and_do(?POOL, Action, handover_async)),
|
||||||
Callback = fun(Result) -> From ! {result, Result} end,
|
Callback = {test_client, callback, [self()]},
|
||||||
?assertEqual(ok, ecpool:with_client(?POOL, fun(Client) ->
|
?assertEqual(ok, ecpool:pick_and_do(?POOL, Action, {handover_async, Callback})),
|
||||||
test_client:plus(Client, 1, 3)
|
|
||||||
end, {relay_async, Callback})),
|
|
||||||
receive
|
receive
|
||||||
{result, 4} -> ok;
|
{result, 4} -> ok;
|
||||||
R1 -> ct:fail({unexpected_result, R1})
|
R1 -> ct:fail({unexpected_result, R1})
|
||||||
end.
|
end.
|
|
@ -24,6 +24,7 @@
|
||||||
|
|
||||||
-export([ connect/1
|
-export([ connect/1
|
||||||
, plus/3
|
, plus/3
|
||||||
|
, callback/2
|
||||||
, stop/2
|
, stop/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -48,6 +49,9 @@ connect(Opts) ->
|
||||||
plus(Pid, L, R) ->
|
plus(Pid, L, R) ->
|
||||||
gen_server:call(Pid, {plus, L, R}).
|
gen_server:call(Pid, {plus, L, R}).
|
||||||
|
|
||||||
|
callback(Result, SendTo) ->
|
||||||
|
SendTo ! {result, Result}.
|
||||||
|
|
||||||
stop(Pid, Reason) ->
|
stop(Pid, Reason) ->
|
||||||
gen_server:call(Pid, {stop, Reason}).
|
gen_server:call(Pid, {stop, Reason}).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue