emqx/apps/emqx_resource/src/emqx_resource_pool.erl

128 lines
3.9 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_resource_pool).
-export([
start/3,
stop/1,
health_check_timeout/0,
health_check_workers/2,
health_check_workers/3,
health_check_workers/4
]).
-include_lib("emqx/include/logger.hrl").
-include("emqx_resource.hrl").
-ifndef(TEST).
-define(HEALTH_CHECK_TIMEOUT, 15000).
-else.
%% make tests faster
-define(HEALTH_CHECK_TIMEOUT, 1000).
-endif.
start(Name, Mod, Options) ->
case ecpool:start_sup_pool(Name, Mod, Options) of
{ok, _} ->
?SLOG(info, #{msg => "start_ecpool_ok", pool_name => Name}, #{tag => ?TAG}),
ok;
{error, {already_started, _Pid}} ->
stop(Name),
start(Name, Mod, Options);
{error, Reason} ->
NReason = parse_reason(Reason),
IsDryRun = emqx_resource:is_dry_run(Name),
?SLOG(
?LOG_LEVEL(IsDryRun),
#{
msg => "start_ecpool_error",
resource_id => Name,
reason => NReason
},
#{tag => ?TAG}
),
{error, {start_pool_failed, Name, NReason}}
end.
stop(Name) ->
case ecpool:stop_sup_pool(Name) of
ok ->
?SLOG(info, #{msg => "stop_ecpool_ok", pool_name => Name}, #{tag => ?TAG});
{error, not_found} ->
ok;
{error, Reason} ->
IsDryRun = emqx_resource:is_dry_run(Name),
?SLOG(
?LOG_LEVEL(IsDryRun),
#{
msg => "stop_ecpool_failed",
resource_id => Name,
reason => Reason
},
#{tag => ?TAG}
),
error({stop_pool_failed, Name, Reason})
end.
health_check_timeout() ->
?HEALTH_CHECK_TIMEOUT.
health_check_workers(PoolName, CheckFunc) ->
health_check_workers(PoolName, CheckFunc, ?HEALTH_CHECK_TIMEOUT, _Opts = #{}).
health_check_workers(PoolName, CheckFunc, Timeout) ->
health_check_workers(PoolName, CheckFunc, Timeout, _Opts = #{}).
health_check_workers(PoolName, CheckFunc, Timeout, Opts) ->
ReturnValues = maps:get(return_values, Opts, false),
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
DoPerWorker =
fun(Worker) ->
case ecpool_worker:client(Worker) of
{ok, Conn} ->
erlang:is_process_alive(Conn) andalso
ecpool_worker:exec(Worker, CheckFunc, Timeout);
Error ->
Error
end
end,
Results =
try
{ok, emqx_utils:pmap(DoPerWorker, Workers, Timeout)}
catch
exit:timeout ->
{error, timeout}
end,
case ReturnValues of
true ->
Results;
false ->
case Results of
{ok, []} -> false;
{ok, Rs = [_ | _]} -> lists:all(fun(St) -> St =:= true end, Rs);
_ -> false
end
end.
parse_reason({
{shutdown, {failed_to_start_child, _, {shutdown, {failed_to_start_child, _, Reason}}}},
_
}) ->
Reason;
parse_reason(Reason) ->
Reason.