emqx/test/ecpool_SUITE.erl

216 lines
8.5 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(ecpool_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-define(POOL, test_pool).
-define(POOL_OPTS,
[%% schedulers number
{pool_size, 10},
%% round-robbin | random | hash
{pool_type, random},
%% false | pos_integer()
{auto_reconnect, false},
%% DB Parameters
{host, "localhost"},
{port, 5432},
{username, "feng"},
{password, ""},
{database, "mqtt"},
{encoding, utf8}
]).
all() ->
[{group, all}].
groups() ->
[{all, [sequence],
[t_start_pool,
t_start_sup_pool,
t_restart_client,
t_reconnect_client,
t_client_exec_hash,
t_client_exec2_hash,
t_client_exec_random,
t_client_exec2_random,
t_multiprocess_client,
t_multiprocess_client_not_restart
]}].
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(gproc),
{ok, _} = application:ensure_all_started(ecpool),
Config.
end_per_suite(_Config) ->
ok = application:stop(ecpool),
ok = application:stop(gproc).
t_start_pool(_Config) ->
ecpool:start_pool(?POOL, test_client, ?POOL_OPTS),
?assertEqual(10, length(ecpool:workers(test_pool))),
?debugFmt("~p~n", [ecpool:workers(test_pool)]),
lists:foreach(fun(I) ->
ecpool:with_client(?POOL, fun(Client) ->
?debugFmt("Call ~p: ~p~n", [I, Client])
end)
end, lists:seq(1, 10)).
t_start_sup_pool(_Config) ->
{ok, Pid1} = ecpool:start_sup_pool(xpool, test_client, ?POOL_OPTS),
{ok, Pid2} = ecpool:start_sup_pool(ypool, test_client, ?POOL_OPTS),
?assertEqual([{xpool, Pid1}, {ypool, Pid2}], lists:sort(ecpool_sup:pools())),
ecpool:stop_sup_pool(ypool),
ecpool:stop_sup_pool(xpool),
?assertEqual([], ecpool_sup:pools()).
t_restart_client(_Config) ->
ecpool:start_pool(?POOL, test_client, [{pool_size, 4}]),
?assertEqual(4, length(ecpool:workers(?POOL))),
ecpool:with_client(?POOL, fun(Client) -> test_client:stop(Client, normal) end),
?debugFmt("~n~p~n", [ecpool:workers(?POOL)]),
?assertEqual(3, length(ecpool:workers(?POOL))),
ecpool:with_client(?POOL, fun(Client) -> test_client:stop(Client, {shutdown, x}) end),
?debugFmt("~n~p~n", [ecpool:workers(?POOL)]),
?assertEqual(2, length(ecpool:workers(?POOL))),
ecpool:with_client(?POOL, fun(Client) ->
test_client:stop(Client, badarg)
end),
timer:sleep(100),
?debugFmt("~n~p~n", [ecpool:workers(?POOL)]),
?assertEqual(2, length(ecpool:workers(?POOL))).
t_reconnect_client(_Config) ->
ecpool:start_pool(?POOL, test_client, [{pool_size, 4}, {auto_reconnect, 1}]),
?assertEqual(4, length(ecpool:workers(?POOL))),
ecpool:with_client(?POOL, fun(Client) ->
test_client:stop(Client, normal)
end),
?assert(lists:member(false, [ecpool_worker:is_connected(Pid) || {_, Pid} <- ecpool:workers(?POOL)])),
timer:sleep(1100),
?assertNot(lists:member(false, [ecpool_worker:is_connected(Pid) || {_, Pid} <- ecpool:workers(?POOL)])),
ecpool:with_client(?POOL, fun(Client) ->
test_client:stop(Client, {shutdown, badarg})
end),
?assert(lists:member(false, [ecpool_worker:is_connected(Pid) || {_, Pid} <- ecpool:workers(?POOL)])),
timer:sleep(1100),
?assertEqual(4, length(ecpool:workers(?POOL))).
t_multiprocess_client(_Config) ->
ecpool:start_pool(?POOL, test_client, [{pool_size, 4}, {auto_reconnect, 1}, {multiprocess, true}]),
?assertEqual(4, length(ecpool:workers(?POOL))),
%% check client status
[begin
true = ecpool_worker:is_connected(Worker),
{ok, _C = {Pid1, Pid2}} = ecpool_worker:client(Worker),
true = is_process_alive(Pid1),
true = is_process_alive(Pid2)
end || {_WorkerName, Worker} <- ecpool:workers(?POOL)],
%% stop one of the clients
Client = ecpool:with_client(?POOL,
fun(C = {P1, _P2}) ->
test_client:stop(P1, normal), C
end),
ct:sleep(1500),
%% check that client is reconnected
[begin
true = ecpool_worker:is_connected(Worker),
{ok, Client2 = {Pid3, Pid4}} = ecpool_worker:client(Worker),
true = is_process_alive(Pid3),
true = is_process_alive(Pid4),
?assert(Client2 =/= Client)
end || {_WorkerName, Worker} <- ecpool:workers(?POOL)].
t_multiprocess_client_not_restart(_Config) ->
ecpool:start_pool(?POOL, test_client, [{pool_size, 4}, {auto_reconnect, false}, {multiprocess, true}]),
?assertEqual(4, length(ecpool:workers(?POOL))),
%% check client status
[begin
true = ecpool_worker:is_connected(Worker),
{ok, {Pid1, Pid2}} = ecpool_worker:client(Worker),
true = is_process_alive(Pid1),
true = is_process_alive(Pid2)
end || {_WorkerName, Worker} <- ecpool:workers(?POOL)],
%% stop all the clients
[begin
true = ecpool_worker:is_connected(Worker),
{ok, {Pid1, _Pid2}} = ecpool_worker:client(Worker),
test_client:stop(Pid1, normal)
end || {_WorkerName, Worker} <- ecpool:workers(?POOL)],
ct:sleep(1500),
%% check that all the clients are disconnected and not restarted.
[begin
?assertEqual(false, ecpool_worker:is_connected(Worker))
end || {_WorkerName, Worker} <- ecpool:workers(?POOL)].
t_client_exec_hash(_Config) ->
Opts = [{pool_size, 5}, {pool_type, hash}, {auto_reconnect, false}],
{ok, Pid1} = ecpool:start_pool(abc, test_client, Opts),
ct:pal("----- pid: ~p", [is_process_alive(Pid1)]),
?assertEqual(4, ecpool:with_client(abc, <<"abc">>, fun(Client) ->
test_client:plus(Client, 1, 3)
end)),
?assertEqual(4, ecpool:with_client(abc, 2, fun(Client) ->
test_client:plus(Client, 1, 3)
end)),
ecpool:stop_sup_pool(abc).
t_client_exec2_hash(_Config) ->
Action = {test_client, plus, [1,3]},
Opts = [{pool_size, 5}, {pool_type, hash}, {auto_reconnect, false}],
{ok, Pid1} = ecpool:start_pool(abc, test_client, Opts),
ct:pal("----- pid: ~p", [is_process_alive(Pid1)]),
?assertEqual(4, ecpool:pick_and_do({abc, <<"abc">>}, Action, no_handover)),
?assertEqual(4, ecpool:pick_and_do({abc, 3}, Action, no_handover)),
Callback = {test_client, callback, [self()]},
?assertEqual(4, ecpool:pick_and_do({abc, <<"abc">>}, Action, handover)),
?assertEqual(ok, ecpool:pick_and_do({abc, 1}, Action, handover_async)),
?assertEqual(ok, ecpool:pick_and_do({abc, <<"abc">>}, Action, {handover_async, Callback})),
receive
{result, 4} -> ok;
R2 -> ct:fail({unexpected_result, R2})
end,
ecpool:stop_sup_pool(abc).
t_client_exec_random(_Config) ->
ecpool:start_pool(?POOL, test_client, ?POOL_OPTS),
?assertEqual(4, ecpool:with_client(?POOL, fun(Client) ->
test_client:plus(Client, 1, 3)
end)).
t_client_exec2_random(_Config) ->
Action = {test_client, plus, [1,3]},
ecpool:start_pool(?POOL, test_client, ?POOL_OPTS),
?assertEqual(4, ecpool:pick_and_do(?POOL, Action, no_handover)),
?assertEqual(4, ecpool:pick_and_do(?POOL, Action, handover)),
?assertEqual(ok, ecpool:pick_and_do(?POOL, Action, handover_async)),
Callback = {test_client, callback, [self()]},
?assertEqual(ok, ecpool:pick_and_do(?POOL, Action, {handover_async, Callback})),
receive
{result, 4} -> ok;
R1 -> ct:fail({unexpected_result, R1})
end.