From dae9633cd51638b6c8b90267db795973847bdb2a Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 19 Nov 2020 16:50:38 +0800 Subject: [PATCH] feature(worker): relay function calls by workers --- include/ecpool.hrl | 3 +++ src/ecpool.erl | 37 +++++++++++++++++++++++---------- src/ecpool_worker.erl | 38 ++++++++++++++++++++++++++++++++++ test/ecpool_SUITE.erl | 48 +++++++++++++++++++++++++++++++++++++++++++ test/test_client.erl | 11 ++++++++-- 5 files changed, 124 insertions(+), 13 deletions(-) create mode 100644 include/ecpool.hrl diff --git a/include/ecpool.hrl b/include/ecpool.hrl new file mode 100644 index 000000000..8abcbd5d5 --- /dev/null +++ b/include/ecpool.hrl @@ -0,0 +1,3 @@ +-type callback() :: fun((any()) -> any()). +-type action() :: fun((pid()) -> any()). +-type exec_mode() :: relay | relay_async | {relay_async, callback()} | direct. \ No newline at end of file diff --git a/src/ecpool.erl b/src/ecpool.erl index 58b587e91..b8040093b 100644 --- a/src/ecpool.erl +++ b/src/ecpool.erl @@ -16,6 +16,8 @@ -module(ecpool). +-include("ecpool.hrl"). + -export([ pool_spec/4 , start_pool/3 , start_sup_pool/3 @@ -24,6 +26,7 @@ , get_client/2 , with_client/2 , with_client/3 + , with_client/4 , name/1 , workers/1 ]). @@ -91,21 +94,33 @@ add_reconnect_callback(Pool, Callback) -> ok. %% @doc Call the fun with client/connection --spec(with_client(atom(), fun((Client :: pid()) -> any())) -> no_return()). -with_client(Pool, Fun) when is_atom(Pool) -> - with_worker(gproc_pool:pick_worker(name(Pool)), Fun). +-spec(with_client(atom(), action()) -> any()). +with_client(Pool, Action) when is_atom(Pool) -> + with_client(Pool, Action, direct). -%% @doc Call the fun with client/connection --spec(with_client(atom(), any(), fun((Client :: pid()) -> term())) -> no_return()). -with_client(Pool, Key, Fun) when is_atom(Pool) -> - with_worker(gproc_pool:pick_worker(name(Pool), Key), Fun). +-spec(with_client(atom(), action() | term(), action() | exec_mode()) -> any()). +with_client(Pool, Key, Action) when is_atom(Pool), is_function(Action) -> + with_client(Pool, Key, Action, direct); --spec(with_worker(Worker :: pid(), fun((Client :: pid()) -> any())) -> no_return()). -with_worker(Worker, Fun) -> +with_client(Pool, Action, Mode) when is_atom(Pool), is_function(Action) -> + with_worker(gproc_pool:pick_worker(name(Pool)), Action, Mode). + +-spec(with_client(atom(), any(), fun((Client :: pid()) -> term()), exec_mode()) -> any()). +with_client(Pool, Key, Action, Mode) when is_atom(Pool) -> + with_worker(gproc_pool:pick_worker(name(Pool), Key), Action, Mode). + +-spec with_worker(pid(), action(), exec_mode()) -> any(). +with_worker(Worker, Action, direct) -> case ecpool_worker:client(Worker) of - {ok, Client} -> Fun(Client); + {ok, Client} -> Action(Client); {error, Reason} -> {error, Reason} - end. + end; +with_worker(Worker, Action, relay) -> + ecpool_worker:exec(Worker, Action); +with_worker(Worker, Action, relay_async) -> + ecpool_worker:exec_async(Worker, Action); +with_worker(Worker, Action, {relay_async, CallbackFun}) -> + ecpool_worker:exec_async(Worker, Action, CallbackFun). %% @doc Pool workers workers(Pool) -> diff --git a/src/ecpool_worker.erl b/src/ecpool_worker.erl index 813dac9cb..c961083d5 100644 --- a/src/ecpool_worker.erl +++ b/src/ecpool_worker.erl @@ -16,12 +16,17 @@ -module(ecpool_worker). +-include("ecpool.hrl"). + -behaviour(gen_server). -export([start_link/4]). %% API Function Exports -export([ client/1 + , exec/2 + , exec_async/2 + , exec_async/3 , is_connected/1 , set_reconnect_callback/2 , add_reconnect_callback/2 @@ -82,6 +87,18 @@ start_link(Pool, Id, Mod, Opts) -> client(Pid) -> gen_server:call(Pid, client, infinity). +-spec(exec(pid(), action()) -> Result :: any() | {error, Reason :: term()}). +exec(Pid, Action) -> + gen_server:call(Pid, {exec, Action}, infinity). + +-spec(exec_async(pid(), action()) -> Result :: any() | {error, Reason :: term()}). +exec_async(Pid, Action) -> + gen_server:call(Pid, {exec_async, Action}). + +-spec(exec_async(pid(), action(), callback()) -> Result :: any() | {error, Reason :: term()}). +exec_async(Pid, Action, Callback) -> + gen_server:call(Pid, {exec_async, Action, Callback}). + %% @doc Is client connected? -spec(is_connected(pid()) -> boolean()). is_connected(Pid) -> @@ -128,6 +145,19 @@ handle_call(client, _From, State = #state{client = undefined}) -> handle_call(client, _From, State = #state{client = Client}) -> {reply, {ok, Client}, State}; +handle_call({exec, Action}, _From, State = #state{client = Client}) -> + {reply, safe_exec(Action, Client), State}; + +handle_call({exec_async, Action}, From, State = #state{client = Client}) -> + gen_server:reply(From, ok), + _ = safe_exec(Action, Client), + {noreply, State}; + +handle_call({exec_async, Action, Callback}, From, State = #state{client = Client}) -> + gen_server:reply(From, ok), + _ = Callback(safe_exec(Action, Client)), + {noreply, State}; + handle_call(Req, _From, State) -> logger:error("[PoolWorker] unexpected call: ~p", [Req]), {reply, ignored, State}. @@ -237,3 +267,11 @@ connect_internal(State) -> _C:Reason:ST -> {error, {Reason, ST}} end. +safe_exec(Action, Client) when is_pid(Client) -> + try Action(Client) + catch E:R:ST -> + logger:error("[PoolWorker] safe_exec failed: ~p", [{E,R,ST}]), + {error, {exec_failed, E, R}} + end; +safe_exec(_Action, undefined) -> + {error, worker_disconnected}. diff --git a/test/ecpool_SUITE.erl b/test/ecpool_SUITE.erl index b9115ace0..3b79cefcc 100644 --- a/test/ecpool_SUITE.erl +++ b/test/ecpool_SUITE.erl @@ -49,11 +49,14 @@ groups() -> t_start_sup_pool, t_restart_client, t_reconnect_client, + t_client_exec_hash, + t_client_exec_random, t_multiprocess_client, t_multiprocess_client_not_restart ]}]. init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(gproc), {ok, _} = application:ensure_all_started(ecpool), Config. @@ -161,3 +164,48 @@ t_multiprocess_client_not_restart(_Config) -> [begin ?assertEqual(false, ecpool_worker:is_connected(Worker)) end || {_WorkerName, Worker} <- ecpool:workers(?POOL)]. + +t_client_exec_hash(_Config) -> + Opts = [{pool_size, 5}, {pool_type, hash}, {auto_reconnect, false}], + {ok, Pid1} = ecpool:start_pool(abc, test_client, Opts), + ct:pal("----- pid: ~p", [is_process_alive(Pid1)]), + ?assertEqual(4, ecpool:with_client(abc, <<"abc">>, fun(Client) -> + test_client:plus(Client, 1, 3) + end, direct)), + From = self(), + Callback = fun(Result) -> From ! {result, Result} end, + ?assertEqual(4, ecpool:with_client(abc, <<"abc">>, fun(Client) -> + test_client:plus(Client, 1, 3) + end, relay)), + ?assertEqual(ok, ecpool:with_client(abc, <<"abc">>, fun(Client) -> + test_client:plus(Client, 1, 3) + end, relay_async)), + ?assertEqual(ok, ecpool:with_client(abc, <<"abc">>, fun(Client) -> + test_client:plus(Client, 1, 3) + end, {relay_async, Callback})), + receive + {result, 4} -> ok; + R2 -> ct:fail({unexpected_result, R2}) + end, + ecpool:stop_sup_pool(abc). + +t_client_exec_random(_Config) -> + ecpool:start_pool(?POOL, test_client, ?POOL_OPTS), + ?assertEqual(4, ecpool:with_client(?POOL, fun(Client) -> + test_client:plus(Client, 1, 3) + end, direct)), + ?assertEqual(4, ecpool:with_client(?POOL, fun(Client) -> + test_client:plus(Client, 1, 3) + end, relay)), + ?assertEqual(ok, ecpool:with_client(?POOL, fun(Client) -> + test_client:plus(Client, 1, 3) + end, relay_async)), + From = self(), + Callback = fun(Result) -> From ! {result, Result} end, + ?assertEqual(ok, ecpool:with_client(?POOL, fun(Client) -> + test_client:plus(Client, 1, 3) + end, {relay_async, Callback})), + receive + {result, 4} -> ok; + R1 -> ct:fail({unexpected_result, R1}) + end. diff --git a/test/test_client.erl b/test/test_client.erl index 9da63753e..731384798 100644 --- a/test/test_client.erl +++ b/test/test_client.erl @@ -22,8 +22,9 @@ -define(SERVER, ?MODULE). --export([connect/1, - stop/2 +-export([ connect/1 + , plus/3 + , stop/2 ]). -export([init/1, @@ -44,6 +45,9 @@ connect(Opts) -> gen_server:start_link(?MODULE, [Opts], []) end. +plus(Pid, L, R) -> + gen_server:call(Pid, {plus, L, R}). + stop(Pid, Reason) -> gen_server:call(Pid, {stop, Reason}). @@ -57,6 +61,9 @@ init(Args) -> handle_call({stop, Reason}, _From, State) -> {stop, Reason, ok, State}; +handle_call({plus, L, R}, _From, State) -> + {reply, L + R, State}; + handle_call(_Req, _From, State) -> {reply, ok, State}.