From 47a223fadc9e902d21121e9d154165ff00dc17ad Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 24 Nov 2020 09:22:46 +0800 Subject: [PATCH] refactor(with_client): change with_client/2,3 to pick_and_do/3 (#24) * refactor(with_client): change with_client/2,3 to pick_and_do/3 Change the actions and callbacks to type {M, F, A} to avoid badfun when hot upgrading modules. --- include/ecpool.hrl | 19 ++++++-- src/ecpool.erl | 75 +++++++++++++++--------------- src/ecpool_worker.erl | 103 +++++++++++++++++++++++++----------------- test/ecpool_SUITE.erl | 53 ++++++++++++---------- test/test_client.erl | 4 ++ 5 files changed, 147 insertions(+), 107 deletions(-) diff --git a/include/ecpool.hrl b/include/ecpool.hrl index 8abcbd5d5..f7e2dbd4d 100644 --- a/include/ecpool.hrl +++ b/include/ecpool.hrl @@ -1,3 +1,16 @@ --type callback() :: fun((any()) -> any()). --type action() :: fun((pid()) -> any()). --type exec_mode() :: relay | relay_async | {relay_async, callback()} | direct. \ No newline at end of file +-type callback() :: mfa() | fun((any()) -> any()). +-type action() :: mfa() | fun((pid()) -> any()). +-type apply_mode() :: handover + | handover_async + | {handover, timeout()} + | {handover_async, callback()} + | no_handover. +-type pool_type() :: random | hash | direct | round_robin. +-type pool_name() :: term(). +-type conn_callback() :: mfa(). +-type option() :: {pool_size, pos_integer()} + | {pool_type, pool_type()} + | {auto_reconnect, false | pos_integer()} + | {on_reconnect, conn_callback()} + | {on_disconnect, conn_callback()} + | tuple(). diff --git a/src/ecpool.erl b/src/ecpool.erl index b8040093b..a9749df66 100644 --- a/src/ecpool.erl +++ b/src/ecpool.erl @@ -24,31 +24,18 @@ , stop_sup_pool/1 , get_client/1 , get_client/2 - , with_client/2 - , with_client/3 - , with_client/4 + , pick_and_do/3 , name/1 , workers/1 ]). -export([set_reconnect_callback/2, add_reconnect_callback/2]). --export_type([ pool_name/0 - , pool_type/0 - , option/0 - ]). - --type(pool_name() :: term()). - --type(pool_type() :: random | hash | round_robin). - --type(reconn_callback() :: {fun((pid()) -> term())}). - --type(option() :: {pool_size, pos_integer()} - | {pool_type, pool_type()} - | {auto_reconnect, false | pos_integer()} - | {on_reconnect, reconn_callback()} - | tuple()). +%% NOTE: Obsolete APIs. +%% Use pick_and_do/3 APIs instead +-export([ with_client/2 + , with_client/3 + ]). pool_spec(ChildId, Pool, Mod, Opts) -> #{id => ChildId, @@ -81,45 +68,51 @@ get_client(Pool) -> get_client(Pool, Key) -> gproc_pool:pick_worker(name(Pool), Key). --spec(set_reconnect_callback(atom(), reconn_callback()) -> ok). +-spec(set_reconnect_callback(atom(), conn_callback()) -> ok). set_reconnect_callback(Pool, Callback) -> [ecpool_worker:set_reconnect_callback(Worker, Callback) || {_WorkerName, Worker} <- ecpool:workers(Pool)], ok. --spec(add_reconnect_callback(atom(), reconn_callback()) -> ok). +-spec(add_reconnect_callback(atom(), conn_callback()) -> ok). add_reconnect_callback(Pool, Callback) -> [ecpool_worker:add_reconnect_callback(Worker, Callback) || {_WorkerName, Worker} <- ecpool:workers(Pool)], ok. +%% NOTE: Use pick_and_do/3 instead of with_client/2,3 +%% to avoid applying action failure with 'badfun'. +%% %% @doc Call the fun with client/connection --spec(with_client(atom(), action()) -> any()). -with_client(Pool, Action) when is_atom(Pool) -> - with_client(Pool, Action, direct). +-spec(with_client(atom(), fun((Client :: pid()) -> any())) -> no_return()). +with_client(Pool, Fun) when is_atom(Pool) -> + with_worker(gproc_pool:pick_worker(name(Pool)), Fun, no_handover). --spec(with_client(atom(), action() | term(), action() | exec_mode()) -> any()). -with_client(Pool, Key, Action) when is_atom(Pool), is_function(Action) -> - with_client(Pool, Key, Action, direct); +%% @doc Call the fun with client/connection +-spec(with_client(atom(), any(), fun((Client :: pid()) -> term())) -> no_return()). +with_client(Pool, Key, Fun) when is_atom(Pool) -> + with_worker(gproc_pool:pick_worker(name(Pool), Key), Fun, no_handover). -with_client(Pool, Action, Mode) when is_atom(Pool), is_function(Action) -> - with_worker(gproc_pool:pick_worker(name(Pool)), Action, Mode). +-spec pick_and_do({atom(), term()}, mfa(), apply_mode()) -> any(). +pick_and_do({Pool, KeyOrNum}, Action = {_,_,_}, ApplyMode) -> + with_worker(gproc_pool:pick_worker(name(Pool), KeyOrNum), Action, ApplyMode); --spec(with_client(atom(), any(), fun((Client :: pid()) -> term()), exec_mode()) -> any()). -with_client(Pool, Key, Action, Mode) when is_atom(Pool) -> - with_worker(gproc_pool:pick_worker(name(Pool), Key), Action, Mode). +pick_and_do(Pool, Action = {_,_,_}, ApplyMode) -> + with_worker(gproc_pool:pick_worker(name(Pool)), Action, ApplyMode). --spec with_worker(pid(), action(), exec_mode()) -> any(). -with_worker(Worker, Action, direct) -> +-spec with_worker(pid(), action(), apply_mode()) -> any(). +with_worker(Worker, Action, no_handover) -> case ecpool_worker:client(Worker) of - {ok, Client} -> Action(Client); + {ok, Client} -> exec(Action, Client); {error, Reason} -> {error, Reason} end; -with_worker(Worker, Action, relay) -> - ecpool_worker:exec(Worker, Action); -with_worker(Worker, Action, relay_async) -> +with_worker(Worker, Action, handover) -> + ecpool_worker:exec(Worker, Action, infinity); +with_worker(Worker, Action, {handover, Timeout}) when is_integer(Timeout) -> + ecpool_worker:exec(Worker, Action, Timeout); +with_worker(Worker, Action, handover_async) -> ecpool_worker:exec_async(Worker, Action); -with_worker(Worker, Action, {relay_async, CallbackFun}) -> +with_worker(Worker, Action, {handover_async, CallbackFun = {_,_,_}}) -> ecpool_worker:exec_async(Worker, Action, CallbackFun). %% @doc Pool workers @@ -129,3 +122,7 @@ workers(Pool) -> %% @doc ecpool name name(Pool) -> {?MODULE, Pool}. +exec({M, F, A}, Client) -> + erlang:apply(M, F, [Client]++A); +exec(Action, Client) when is_function(Action) -> + Action(Client). diff --git a/src/ecpool_worker.erl b/src/ecpool_worker.erl index 949679580..da550beb4 100644 --- a/src/ecpool_worker.erl +++ b/src/ecpool_worker.erl @@ -24,12 +24,14 @@ %% API Function Exports -export([ client/1 - , exec/2 + , exec/3 , exec_async/2 , exec_async/3 , is_connected/1 , set_reconnect_callback/2 + , set_disconnect_callback/2 , add_reconnect_callback/2 + , add_disconnect_callback/2 ]). %% gen_server Function Exports @@ -46,8 +48,8 @@ id :: pos_integer(), client :: pid() | undefined, mod :: module(), - on_reconnect :: ecpool:reconn_callback(), - on_disconnect :: ecpool:reconn_callback(), + on_reconnect :: ecpool:conn_callback(), + on_disconnect :: ecpool:conn_callback(), supervisees = [], opts :: proplists:proplist() }). @@ -74,31 +76,39 @@ start_link(Pool, Id, Mod, Opts) -> client(Pid) -> gen_server:call(Pid, client, infinity). --spec(exec(pid(), action()) -> Result :: any() | {error, Reason :: term()}). -exec(Pid, Action) -> - gen_server:call(Pid, {exec, Action}, infinity). +-spec(exec(pid(), action(), timeout()) -> Result :: any() | {error, Reason :: term()}). +exec(Pid, Action, Timeout) -> + gen_server:call(Pid, {exec, Action}, Timeout). --spec(exec_async(pid(), action()) -> Result :: any() | {error, Reason :: term()}). +-spec exec_async(pid(), action()) -> ok. exec_async(Pid, Action) -> - gen_server:call(Pid, {exec_async, Action}). + gen_server:cast(Pid, {exec_async, Action}). --spec(exec_async(pid(), action(), callback()) -> Result :: any() | {error, Reason :: term()}). +-spec exec_async(pid(), action(), callback()) -> ok. exec_async(Pid, Action, Callback) -> - gen_server:call(Pid, {exec_async, Action, Callback}). + gen_server:cast(Pid, {exec_async, Action, Callback}). %% @doc Is client connected? -spec(is_connected(pid()) -> boolean()). is_connected(Pid) -> gen_server:call(Pid, is_connected, infinity). --spec(set_reconnect_callback(pid(), ecpool:reconn_callback()) -> ok). -set_reconnect_callback(Pid, OnReconnect) -> +-spec(set_reconnect_callback(pid(), ecpool:conn_callback()) -> ok). +set_reconnect_callback(Pid, OnReconnect = {_, _, _}) -> gen_server:cast(Pid, {set_reconn_callbk, OnReconnect}). --spec(add_reconnect_callback(pid(), ecpool:reconn_callback()) -> ok). -add_reconnect_callback(Pid, OnReconnect) -> +-spec(set_disconnect_callback(pid(), ecpool:conn_callback()) -> ok). +set_disconnect_callback(Pid, OnDisconnect = {_, _, _}) -> + gen_server:cast(Pid, {set_disconn_callbk, OnDisconnect}). + +-spec(add_reconnect_callback(pid(), ecpool:conn_callback()) -> ok). +add_reconnect_callback(Pid, OnReconnect = {_, _, _}) -> gen_server:cast(Pid, {add_reconn_callbk, OnReconnect}). +-spec(add_disconnect_callback(pid(), ecpool:conn_callback()) -> ok). +add_disconnect_callback(Pid, OnDisconnect = {_, _, _}) -> + gen_server:cast(Pid, {add_disconn_callbk, OnDisconnect}). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -109,8 +119,8 @@ init([Pool, Id, Mod, Opts]) -> id = Id, mod = Mod, opts = Opts, - on_reconnect = proplists:get_value(on_reconnect, Opts), - on_disconnect = proplists:get_value(on_disconnect, Opts) + on_reconnect = ensure_callback(proplists:get_value(on_reconnect, Opts)), + on_disconnect = ensure_callback(proplists:get_value(on_disconnect, Opts)) }, case connect_internal(State) of {ok, NewState} -> @@ -135,31 +145,29 @@ handle_call(client, _From, State = #state{client = Client}) -> handle_call({exec, Action}, _From, State = #state{client = Client}) -> {reply, safe_exec(Action, Client), State}; -handle_call({exec_async, Action}, From, State = #state{client = Client}) -> - gen_server:reply(From, ok), - _ = safe_exec(Action, Client), - {noreply, State}; - -handle_call({exec_async, Action, Callback}, From, State = #state{client = Client}) -> - gen_server:reply(From, ok), - _ = Callback(safe_exec(Action, Client)), - {noreply, State}; - handle_call(Req, _From, State) -> logger:error("[PoolWorker] unexpected call: ~p", [Req]), {reply, ignored, State}. +handle_cast({exec_async, Action}, State = #state{client = Client}) -> + _ = safe_exec(Action, Client), + {noreply, State}; + +handle_cast({exec_async, Action, Callback}, State = #state{client = Client}) -> + _ = safe_exec(Callback, safe_exec(Action, Client)), + {noreply, State}; + handle_cast({set_reconn_callbk, OnReconnect}, State) -> {noreply, State#state{on_reconnect = OnReconnect}}; -handle_cast({add_reconn_callbk, OnReconnect}, State = #state{on_reconnect = OnReconnectList}) when is_list(OnReconnectList) -> - {noreply, State#state{on_reconnect = [OnReconnect | OnReconnectList]}}; +handle_cast({set_disconn_callbk, OnDisconnect}, State) -> + {noreply, State#state{on_disconnect = OnDisconnect}}; -handle_cast({add_reconn_callbk, OnReconnect}, State = #state{on_reconnect = undefined}) -> - {noreply, State#state{on_reconnect = [OnReconnect]}}; +handle_cast({add_reconn_callbk, OnReconnect}, State = #state{on_reconnect = OldOnReconnect}) -> + {noreply, State#state{on_reconnect = add_conn_callback(OnReconnect, OldOnReconnect)}}; -handle_cast({add_reconn_callbk, OnReconnect}, State = #state{on_reconnect = OnReconnect0}) -> - {noreply, State#state{on_reconnect = [OnReconnect, OnReconnect0]}}; +handle_cast({add_disconn_callbk, OnDisconnect}, State = #state{on_disconnect = OldOnDisconnect}) -> + {noreply, State#state{on_disconnect = add_conn_callback(OnDisconnect, OldOnDisconnect)}}; handle_cast(_Msg, State) -> {noreply, State}. @@ -229,16 +237,16 @@ handle_reconnect(_, undefined) -> handle_reconnect(undefined, _) -> ok; handle_reconnect(Client, OnReconnectList) when is_list(OnReconnectList) -> - [OnReconnect(Client) || OnReconnect <- OnReconnectList]; + [safe_exec(OnReconnect, Client) || OnReconnect <- OnReconnectList]; handle_reconnect(Client, OnReconnect) -> - OnReconnect(Client). + safe_exec(OnReconnect, Client). handle_disconnect(undefined, _) -> ok; handle_disconnect(_, undefined) -> ok; handle_disconnect(Client, Disconnect) -> - Disconnect(Client). + safe_exec(Disconnect, Client). connect_internal(State) -> try connect(State) of @@ -254,11 +262,24 @@ connect_internal(State) -> _C:Reason:ST -> {error, {Reason, ST}} end. -safe_exec(Action, Client) when is_pid(Client) -> - try Action(Client) +safe_exec(Action, MainArg) -> + try exec(Action, MainArg) catch E:R:ST -> - logger:error("[PoolWorker] safe_exec failed: ~p", [{E,R,ST}]), + logger:error("[PoolWorker] safe_exec ~p, failed: ~0p", [Action, {E,R,ST}]), {error, {exec_failed, E, R}} - end; -safe_exec(_Action, undefined) -> - {error, worker_disconnected}. + end. + +exec({M, F, A}, MainArg) -> + erlang:apply(M, F, [MainArg]++A); +exec(Action, MainArg) when is_function(Action) -> + Action(MainArg). + +ensure_callback(undefined) -> undefined; +ensure_callback({_,_,_} = Callback) -> Callback. + +add_conn_callback(OnReconnect, OldOnReconnects) when is_list(OldOnReconnects) -> + [OnReconnect | OldOnReconnects]; +add_conn_callback(OnReconnect, undefined) -> + [OnReconnect]; +add_conn_callback(OnReconnect, OldOnReconnect) -> + [OnReconnect, OldOnReconnect]. diff --git a/test/ecpool_SUITE.erl b/test/ecpool_SUITE.erl index 3b79cefcc..0747929d6 100644 --- a/test/ecpool_SUITE.erl +++ b/test/ecpool_SUITE.erl @@ -50,7 +50,9 @@ groups() -> t_restart_client, t_reconnect_client, t_client_exec_hash, + t_client_exec2_hash, t_client_exec_random, + t_client_exec2_random, t_multiprocess_client, t_multiprocess_client_not_restart ]}]. @@ -171,18 +173,23 @@ t_client_exec_hash(_Config) -> ct:pal("----- pid: ~p", [is_process_alive(Pid1)]), ?assertEqual(4, ecpool:with_client(abc, <<"abc">>, fun(Client) -> test_client:plus(Client, 1, 3) - end, direct)), - From = self(), - Callback = fun(Result) -> From ! {result, Result} end, - ?assertEqual(4, ecpool:with_client(abc, <<"abc">>, fun(Client) -> + end)), + ?assertEqual(4, ecpool:with_client(abc, 2, fun(Client) -> test_client:plus(Client, 1, 3) - end, relay)), - ?assertEqual(ok, ecpool:with_client(abc, <<"abc">>, fun(Client) -> - test_client:plus(Client, 1, 3) - end, relay_async)), - ?assertEqual(ok, ecpool:with_client(abc, <<"abc">>, fun(Client) -> - test_client:plus(Client, 1, 3) - end, {relay_async, Callback})), + end)), + ecpool:stop_sup_pool(abc). + +t_client_exec2_hash(_Config) -> + Action = {test_client, plus, [1,3]}, + Opts = [{pool_size, 5}, {pool_type, hash}, {auto_reconnect, false}], + {ok, Pid1} = ecpool:start_pool(abc, test_client, Opts), + ct:pal("----- pid: ~p", [is_process_alive(Pid1)]), + ?assertEqual(4, ecpool:pick_and_do({abc, <<"abc">>}, Action, no_handover)), + ?assertEqual(4, ecpool:pick_and_do({abc, 3}, Action, no_handover)), + Callback = {test_client, callback, [self()]}, + ?assertEqual(4, ecpool:pick_and_do({abc, <<"abc">>}, Action, handover)), + ?assertEqual(ok, ecpool:pick_and_do({abc, 1}, Action, handover_async)), + ?assertEqual(ok, ecpool:pick_and_do({abc, <<"abc">>}, Action, {handover_async, Callback})), receive {result, 4} -> ok; R2 -> ct:fail({unexpected_result, R2}) @@ -193,19 +200,17 @@ t_client_exec_random(_Config) -> ecpool:start_pool(?POOL, test_client, ?POOL_OPTS), ?assertEqual(4, ecpool:with_client(?POOL, fun(Client) -> test_client:plus(Client, 1, 3) - end, direct)), - ?assertEqual(4, ecpool:with_client(?POOL, fun(Client) -> - test_client:plus(Client, 1, 3) - end, relay)), - ?assertEqual(ok, ecpool:with_client(?POOL, fun(Client) -> - test_client:plus(Client, 1, 3) - end, relay_async)), - From = self(), - Callback = fun(Result) -> From ! {result, Result} end, - ?assertEqual(ok, ecpool:with_client(?POOL, fun(Client) -> - test_client:plus(Client, 1, 3) - end, {relay_async, Callback})), + end)). + +t_client_exec2_random(_Config) -> + Action = {test_client, plus, [1,3]}, + ecpool:start_pool(?POOL, test_client, ?POOL_OPTS), + ?assertEqual(4, ecpool:pick_and_do(?POOL, Action, no_handover)), + ?assertEqual(4, ecpool:pick_and_do(?POOL, Action, handover)), + ?assertEqual(ok, ecpool:pick_and_do(?POOL, Action, handover_async)), + Callback = {test_client, callback, [self()]}, + ?assertEqual(ok, ecpool:pick_and_do(?POOL, Action, {handover_async, Callback})), receive {result, 4} -> ok; R1 -> ct:fail({unexpected_result, R1}) - end. + end. \ No newline at end of file diff --git a/test/test_client.erl b/test/test_client.erl index 731384798..44d7542e1 100644 --- a/test/test_client.erl +++ b/test/test_client.erl @@ -24,6 +24,7 @@ -export([ connect/1 , plus/3 + , callback/2 , stop/2 ]). @@ -48,6 +49,9 @@ connect(Opts) -> plus(Pid, L, R) -> gen_server:call(Pid, {plus, L, R}). +callback(Result, SendTo) -> + SendTo ! {result, Result}. + stop(Pid, Reason) -> gen_server:call(Pid, {stop, Reason}).