128 lines
3.9 KiB
Erlang
128 lines
3.9 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_resource_pool).
|
|
|
|
-export([
|
|
start/3,
|
|
stop/1,
|
|
health_check_timeout/0,
|
|
health_check_workers/2,
|
|
health_check_workers/3,
|
|
health_check_workers/4
|
|
]).
|
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include("emqx_resource.hrl").
|
|
|
|
-ifndef(TEST).
|
|
-define(HEALTH_CHECK_TIMEOUT, 15000).
|
|
-else.
|
|
%% make tests faster
|
|
-define(HEALTH_CHECK_TIMEOUT, 1000).
|
|
-endif.
|
|
|
|
start(Name, Mod, Options) ->
|
|
case ecpool:start_sup_pool(Name, Mod, Options) of
|
|
{ok, _} ->
|
|
?SLOG(info, #{msg => "start_ecpool_ok", pool_name => Name}, #{tag => ?TAG}),
|
|
ok;
|
|
{error, {already_started, _Pid}} ->
|
|
stop(Name),
|
|
start(Name, Mod, Options);
|
|
{error, Reason} ->
|
|
NReason = parse_reason(Reason),
|
|
IsDryRun = emqx_resource:is_dry_run(Name),
|
|
?SLOG(
|
|
?LOG_LEVEL(IsDryRun),
|
|
#{
|
|
msg => "start_ecpool_error",
|
|
resource_id => Name,
|
|
reason => NReason
|
|
},
|
|
#{tag => ?TAG}
|
|
),
|
|
{error, {start_pool_failed, Name, NReason}}
|
|
end.
|
|
|
|
stop(Name) ->
|
|
case ecpool:stop_sup_pool(Name) of
|
|
ok ->
|
|
?SLOG(info, #{msg => "stop_ecpool_ok", pool_name => Name}, #{tag => ?TAG});
|
|
{error, not_found} ->
|
|
ok;
|
|
{error, Reason} ->
|
|
IsDryRun = emqx_resource:is_dry_run(Name),
|
|
?SLOG(
|
|
?LOG_LEVEL(IsDryRun),
|
|
#{
|
|
msg => "stop_ecpool_failed",
|
|
resource_id => Name,
|
|
reason => Reason
|
|
},
|
|
#{tag => ?TAG}
|
|
),
|
|
error({stop_pool_failed, Name, Reason})
|
|
end.
|
|
|
|
health_check_timeout() ->
|
|
?HEALTH_CHECK_TIMEOUT.
|
|
|
|
health_check_workers(PoolName, CheckFunc) ->
|
|
health_check_workers(PoolName, CheckFunc, ?HEALTH_CHECK_TIMEOUT, _Opts = #{}).
|
|
|
|
health_check_workers(PoolName, CheckFunc, Timeout) ->
|
|
health_check_workers(PoolName, CheckFunc, Timeout, _Opts = #{}).
|
|
|
|
health_check_workers(PoolName, CheckFunc, Timeout, Opts) ->
|
|
ReturnValues = maps:get(return_values, Opts, false),
|
|
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
|
|
DoPerWorker =
|
|
fun(Worker) ->
|
|
case ecpool_worker:client(Worker) of
|
|
{ok, Conn} ->
|
|
erlang:is_process_alive(Conn) andalso
|
|
ecpool_worker:exec(Worker, CheckFunc, Timeout);
|
|
Error ->
|
|
Error
|
|
end
|
|
end,
|
|
Results =
|
|
try
|
|
{ok, emqx_utils:pmap(DoPerWorker, Workers, Timeout)}
|
|
catch
|
|
exit:timeout ->
|
|
{error, timeout}
|
|
end,
|
|
case ReturnValues of
|
|
true ->
|
|
Results;
|
|
false ->
|
|
case Results of
|
|
{ok, []} -> false;
|
|
{ok, Rs = [_ | _]} -> lists:all(fun(St) -> St =:= true end, Rs);
|
|
_ -> false
|
|
end
|
|
end.
|
|
|
|
parse_reason({
|
|
{shutdown, {failed_to_start_child, _, {shutdown, {failed_to_start_child, _, Reason}}}},
|
|
_
|
|
}) ->
|
|
Reason;
|
|
parse_reason(Reason) ->
|
|
Reason.
|