%%-------------------------------------------------------------------- %% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(ecpool_SUITE). -compile(export_all). -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). -define(POOL, test_pool). -define(POOL_OPTS, [%% schedulers number {pool_size, 10}, %% round-robbin | random | hash {pool_type, random}, %% false | pos_integer() {auto_reconnect, false}, %% DB Parameters {host, "localhost"}, {port, 5432}, {username, "feng"}, {password, ""}, {database, "mqtt"}, {encoding, utf8} ]). all() -> [{group, all}]. groups() -> [{all, [sequence], [t_start_pool, t_start_sup_pool, t_restart_client, t_reconnect_client, t_client_exec_hash, t_client_exec2_hash, t_client_exec_random, t_client_exec2_random, t_multiprocess_client, t_multiprocess_client_not_restart ]}]. init_per_suite(Config) -> {ok, _} = application:ensure_all_started(gproc), {ok, _} = application:ensure_all_started(ecpool), Config. end_per_suite(_Config) -> ok = application:stop(ecpool), ok = application:stop(gproc). t_start_pool(_Config) -> ecpool:start_pool(?POOL, test_client, ?POOL_OPTS), ?assertEqual(10, length(ecpool:workers(test_pool))), ?debugFmt("~p~n", [ecpool:workers(test_pool)]), lists:foreach(fun(I) -> ecpool:with_client(?POOL, fun(Client) -> ?debugFmt("Call ~p: ~p~n", [I, Client]) end) end, lists:seq(1, 10)). t_start_sup_pool(_Config) -> {ok, Pid1} = ecpool:start_sup_pool(xpool, test_client, ?POOL_OPTS), {ok, Pid2} = ecpool:start_sup_pool(ypool, test_client, ?POOL_OPTS), ?assertEqual([{xpool, Pid1}, {ypool, Pid2}], lists:sort(ecpool_sup:pools())), ecpool:stop_sup_pool(ypool), ecpool:stop_sup_pool(xpool), ?assertEqual([], ecpool_sup:pools()). t_restart_client(_Config) -> ecpool:start_pool(?POOL, test_client, [{pool_size, 4}]), ?assertEqual(4, length(ecpool:workers(?POOL))), ecpool:with_client(?POOL, fun(Client) -> test_client:stop(Client, normal) end), ?debugFmt("~n~p~n", [ecpool:workers(?POOL)]), ?assertEqual(3, length(ecpool:workers(?POOL))), ecpool:with_client(?POOL, fun(Client) -> test_client:stop(Client, {shutdown, x}) end), ?debugFmt("~n~p~n", [ecpool:workers(?POOL)]), ?assertEqual(2, length(ecpool:workers(?POOL))), ecpool:with_client(?POOL, fun(Client) -> test_client:stop(Client, badarg) end), timer:sleep(100), ?debugFmt("~n~p~n", [ecpool:workers(?POOL)]), ?assertEqual(2, length(ecpool:workers(?POOL))). t_reconnect_client(_Config) -> ecpool:start_pool(?POOL, test_client, [{pool_size, 4}, {auto_reconnect, 1}]), ?assertEqual(4, length(ecpool:workers(?POOL))), ecpool:with_client(?POOL, fun(Client) -> test_client:stop(Client, normal) end), ?assert(lists:member(false, [ecpool_worker:is_connected(Pid) || {_, Pid} <- ecpool:workers(?POOL)])), timer:sleep(1100), ?assertNot(lists:member(false, [ecpool_worker:is_connected(Pid) || {_, Pid} <- ecpool:workers(?POOL)])), ecpool:with_client(?POOL, fun(Client) -> test_client:stop(Client, {shutdown, badarg}) end), ?assert(lists:member(false, [ecpool_worker:is_connected(Pid) || {_, Pid} <- ecpool:workers(?POOL)])), timer:sleep(1100), ?assertEqual(4, length(ecpool:workers(?POOL))). t_multiprocess_client(_Config) -> ecpool:start_pool(?POOL, test_client, [{pool_size, 4}, {auto_reconnect, 1}, {multiprocess, true}]), ?assertEqual(4, length(ecpool:workers(?POOL))), %% check client status [begin true = ecpool_worker:is_connected(Worker), {ok, _C = {Pid1, Pid2}} = ecpool_worker:client(Worker), true = is_process_alive(Pid1), true = is_process_alive(Pid2) end || {_WorkerName, Worker} <- ecpool:workers(?POOL)], %% stop one of the clients Client = ecpool:with_client(?POOL, fun(C = {P1, _P2}) -> test_client:stop(P1, normal), C end), ct:sleep(1500), %% check that client is reconnected [begin true = ecpool_worker:is_connected(Worker), {ok, Client2 = {Pid3, Pid4}} = ecpool_worker:client(Worker), true = is_process_alive(Pid3), true = is_process_alive(Pid4), ?assert(Client2 =/= Client) end || {_WorkerName, Worker} <- ecpool:workers(?POOL)]. t_multiprocess_client_not_restart(_Config) -> ecpool:start_pool(?POOL, test_client, [{pool_size, 4}, {auto_reconnect, false}, {multiprocess, true}]), ?assertEqual(4, length(ecpool:workers(?POOL))), %% check client status [begin true = ecpool_worker:is_connected(Worker), {ok, {Pid1, Pid2}} = ecpool_worker:client(Worker), true = is_process_alive(Pid1), true = is_process_alive(Pid2) end || {_WorkerName, Worker} <- ecpool:workers(?POOL)], %% stop all the clients [begin true = ecpool_worker:is_connected(Worker), {ok, {Pid1, _Pid2}} = ecpool_worker:client(Worker), test_client:stop(Pid1, normal) end || {_WorkerName, Worker} <- ecpool:workers(?POOL)], ct:sleep(1500), %% check that all the clients are disconnected and not restarted. [begin ?assertEqual(false, ecpool_worker:is_connected(Worker)) end || {_WorkerName, Worker} <- ecpool:workers(?POOL)]. t_client_exec_hash(_Config) -> Opts = [{pool_size, 5}, {pool_type, hash}, {auto_reconnect, false}], {ok, Pid1} = ecpool:start_pool(abc, test_client, Opts), ct:pal("----- pid: ~p", [is_process_alive(Pid1)]), ?assertEqual(4, ecpool:with_client(abc, <<"abc">>, fun(Client) -> test_client:plus(Client, 1, 3) end)), ?assertEqual(4, ecpool:with_client(abc, 2, fun(Client) -> test_client:plus(Client, 1, 3) end)), ecpool:stop_sup_pool(abc). t_client_exec2_hash(_Config) -> Action = {test_client, plus, [1,3]}, Opts = [{pool_size, 5}, {pool_type, hash}, {auto_reconnect, false}], {ok, Pid1} = ecpool:start_pool(abc, test_client, Opts), ct:pal("----- pid: ~p", [is_process_alive(Pid1)]), ?assertEqual(4, ecpool:pick_and_do({abc, <<"abc">>}, Action, no_handover)), ?assertEqual(4, ecpool:pick_and_do({abc, 3}, Action, no_handover)), Callback = {test_client, callback, [self()]}, ?assertEqual(4, ecpool:pick_and_do({abc, <<"abc">>}, Action, handover)), ?assertEqual(ok, ecpool:pick_and_do({abc, 1}, Action, handover_async)), ?assertEqual(ok, ecpool:pick_and_do({abc, <<"abc">>}, Action, {handover_async, Callback})), receive {result, 4} -> ok; R2 -> ct:fail({unexpected_result, R2}) end, ecpool:stop_sup_pool(abc). t_client_exec_random(_Config) -> ecpool:start_pool(?POOL, test_client, ?POOL_OPTS), ?assertEqual(4, ecpool:with_client(?POOL, fun(Client) -> test_client:plus(Client, 1, 3) end)). t_client_exec2_random(_Config) -> Action = {test_client, plus, [1,3]}, ecpool:start_pool(?POOL, test_client, ?POOL_OPTS), ?assertEqual(4, ecpool:pick_and_do(?POOL, Action, no_handover)), ?assertEqual(4, ecpool:pick_and_do(?POOL, Action, handover)), ?assertEqual(ok, ecpool:pick_and_do(?POOL, Action, handover_async)), Callback = {test_client, callback, [self()]}, ?assertEqual(ok, ecpool:pick_and_do(?POOL, Action, {handover_async, Callback})), receive {result, 4} -> ok; R1 -> ct:fail({unexpected_result, R1}) end.