Improve the pool design

- Move 'emqx_pool:start_link/0' to emqx_pool_sup module
- Use the new log macros
- Add more test cases
This commit is contained in:
Feng Lee 2019-01-09 17:27:30 +08:00 committed by Feng Lee
parent 73ae8ad57f
commit fe0f5333b3
4 changed files with 52 additions and 30 deletions

View File

@ -25,7 +25,7 @@ start_link() ->
init([]) -> init([]) ->
{ok, {{one_for_one, 10, 100}, {ok, {{one_for_one, 10, 100},
[child_spec(emqx_pool, supervisor), [child_spec(emqx_pool_sup, supervisor),
child_spec(emqx_alarm_mgr, worker), child_spec(emqx_alarm_mgr, worker),
child_spec(emqx_hooks, worker), child_spec(emqx_hooks, worker),
child_spec(emqx_stats, worker), child_spec(emqx_stats, worker),
@ -41,6 +41,7 @@ child_spec(M, worker) ->
shutdown => 5000, shutdown => 5000,
type => worker, type => worker,
modules => [M]}; modules => [M]};
child_spec(M, supervisor) -> child_spec(M, supervisor) ->
#{id => M, #{id => M,
start => {M, start_link, []}, start => {M, start_link, []},

View File

@ -16,9 +16,14 @@
-behaviour(gen_server). -behaviour(gen_server).
-export([start_link/0, start_link/2]). -include("logger.hrl").
-export([start_link/2]).
-export([submit/1, submit/2]). -export([submit/1, submit/2]).
-export([async_submit/1, async_submit/2]). -export([async_submit/1, async_submit/2]).
-ifdef(TEST).
-export([worker/0]).
-endif.
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@ -28,10 +33,6 @@
-type(task() :: fun() | mfa() | {fun(), Args :: list(any())}). -type(task() :: fun() | mfa() | {fun(), Args :: list(any())}).
%% @doc Start pooler supervisor.
start_link() ->
emqx_pool_sup:start_link(?POOL, random, {?MODULE, start_link, []}).
%% @doc Start pool. %% @doc Start pool.
-spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()). -spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()).
start_link(Pool, Id) -> start_link(Pool, Id) ->
@ -80,22 +81,22 @@ handle_call({submit, Task}, _From, State) ->
{reply, catch run(Task), State}; {reply, catch run(Task), State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
emqx_logger:error("[Pool] unexpected call: ~p", [Req]), ?ERROR("[Pool] unexpected call: ~p", [Req]),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast({async_submit, Task}, State) -> handle_cast({async_submit, Task}, State) ->
try run(Task) try run(Task)
catch _:Error:Stacktrace -> catch _:Error:Stacktrace ->
emqx_logger:error("[Pool] error: ~p, ~p", [Error, Stacktrace]) ?ERROR("[Pool] error: ~p, ~p", [Error, Stacktrace])
end, end,
{noreply, State}; {noreply, State};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
emqx_logger:error("[Pool] unexpected cast: ~p", [Msg]), ?ERROR("[Pool] unexpected cast: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info(Info, State) -> handle_info(Info, State) ->
emqx_logger:error("[Pool] unexpected info: ~p", [Info]), ?ERROR("[Pool] unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, #{pool := Pool, id := Id}) -> terminate(_Reason, #{pool := Pool, id := Id}) ->

View File

@ -16,10 +16,13 @@
-behaviour(supervisor). -behaviour(supervisor).
-export([spec/1, spec/2, start_link/3, start_link/4]). -export([spec/1, spec/2]).
-export([start_link/0, start_link/3, start_link/4]).
-export([init/1]). -export([init/1]).
-define(POOL, emqx_pool).
-spec(spec(list()) -> supervisor:child_spec()). -spec(spec(list()) -> supervisor:child_spec()).
spec(Args) -> spec(Args) ->
spec(pool_sup, Args). spec(pool_sup, Args).
@ -33,6 +36,10 @@ spec(ChildId, Args) ->
type => supervisor, type => supervisor,
modules => [?MODULE]}. modules => [?MODULE]}.
%% @doc Start the default pool supervisor.
start_link() ->
start_link(?POOL, random, {?POOL, start_link, []}).
-spec(start_link(atom() | tuple(), atom(), mfa()) -> {ok, pid()} | {error, term()}). -spec(start_link(atom() | tuple(), atom(), mfa()) -> {ok, pid()} | {error, term()}).
start_link(Pool, Type, MFA) -> start_link(Pool, Type, MFA) ->
start_link(Pool, Type, emqx_vm:schedulers(), MFA). start_link(Pool, Type, emqx_vm:schedulers(), MFA).

View File

@ -15,22 +15,22 @@
-module(emqx_pool_SUITE). -module(emqx_pool_SUITE).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
all() -> [ all() ->
{group, submit_case}, [
{group, async_submit_case} {group, submit_case},
]. {group, async_submit_case},
t_unexpected
].
groups() -> groups() ->
[ [
{submit_case, [sequence], [submit_mfa, submit_fa]}, {submit_case, [sequence], [submit_mfa, submit_fa]},
{async_submit_case, [sequence], [async_submit_mfa]} {async_submit_case, [sequence], [async_submit_mfa, async_submit_ex]}
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
@ -40,26 +40,39 @@ init_per_suite(Config) ->
end_per_suite(_Config) -> end_per_suite(_Config) ->
ok. ok.
init_per_testcase(_, Config) ->
{ok, Sup} = emqx_pool_sup:start_link(),
[{pool_sup, Sup}|Config].
end_per_testcase(_, Config) ->
Sup = proplists:get_value(pool_sup, Config),
exit(Sup, normal).
submit_mfa(_Config) -> submit_mfa(_Config) ->
erlang:process_flag(trap_exit, true),
{ok, Pid} = emqx_pool:start_link(),
Result = emqx_pool:submit({?MODULE, test_mfa, []}), Result = emqx_pool:submit({?MODULE, test_mfa, []}),
?assertEqual(15, Result), ?assertEqual(15, Result).
gen_server:stop(Pid, normal, 3000),
ok.
submit_fa(_Config) -> submit_fa(_Config) ->
{ok, Pid} = emqx_pool:start_link(),
Fun = fun(X) -> case X rem 2 of 0 -> {true, X div 2}; _ -> false end end, Fun = fun(X) -> case X rem 2 of 0 -> {true, X div 2}; _ -> false end end,
Result = emqx_pool:submit(Fun, [2]), Result = emqx_pool:submit(Fun, [2]),
?assertEqual({true, 1}, Result), ?assertEqual({true, 1}, Result).
exit(Pid, normal).
async_submit_mfa(_Config) ->
emqx_pool:async_submit({?MODULE, test_mfa, []}),
emqx_pool:async_submit(fun ?MODULE:test_mfa/0, []).
async_submit_ex(_) ->
emqx_pool:async_submit(fun error_fun/0).
t_unexpected(_) ->
Pid = emqx_pool:worker(),
?assertEqual(ignored, gen_server:call(Pid, bad_request)),
?assertEqual(ok, gen_server:cast(Pid, bad_msg)),
Pid ! bad_info,
ok = gen_server:stop(Pid).
test_mfa() -> test_mfa() ->
lists:foldl(fun(X, Sum) -> X + Sum end, 0, [1,2,3,4,5]). lists:foldl(fun(X, Sum) -> X + Sum end, 0, [1,2,3,4,5]).
async_submit_mfa(_Config) -> error_fun() -> error(test_error).
{ok, Pid} = emqx_pool:start_link(),
emqx_pool:async_submit({?MODULE, test_mfa, []}),
exit(Pid, normal).