feat(emqx_resource_health_check): add timeout to single health_check

This commit is contained in:
EMQ-YangM 2022-01-19 16:25:54 +08:00
parent e7dd401de9
commit fd7e0c800a
3 changed files with 44 additions and 8 deletions

View File

@ -60,6 +60,7 @@
-export([ restart/1 %% restart the instance. -export([ restart/1 %% restart the instance.
, restart/2 , restart/2
, health_check/1 %% verify if the resource is working normally , health_check/1 %% verify if the resource is working normally
, set_resource_status_stoped/1 %% set resource status to stoped
, stop/1 %% stop the instance , stop/1 %% stop the instance
, query/2 %% query the instance , query/2 %% query the instance
, query/3 %% query the instance with after_query() , query/3 %% query the instance with after_query()
@ -231,6 +232,9 @@ stop(InstId) ->
health_check(InstId) -> health_check(InstId) ->
call_instance(InstId, {health_check, InstId}). call_instance(InstId, {health_check, InstId}).
set_resource_status_stoped(InstId) ->
call_instance(InstId, {set_resource_status_stoped, InstId}).
-spec get_instance(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}. -spec get_instance(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}.
get_instance(InstId) -> get_instance(InstId) ->
emqx_resource_instance:lookup(InstId). emqx_resource_instance:lookup(InstId).

View File

@ -20,7 +20,9 @@
, delete_checker/1 , delete_checker/1
]). ]).
-export([health_check/2]). -export([ health_check/2
, health_check_timeout_checker/3
]).
-define(SUP, emqx_resource_health_check_sup). -define(SUP, emqx_resource_health_check_sup).
-define(ID(NAME), {resource_health_check, NAME}). -define(ID(NAME), {resource_health_check, NAME}).
@ -33,6 +35,7 @@ child_spec(Name, Sleep) ->
start_link(Name, Sleep) -> start_link(Name, Sleep) ->
Pid = proc_lib:spawn_link(?MODULE, health_check, [Name, Sleep]), Pid = proc_lib:spawn_link(?MODULE, health_check, [Name, Sleep]),
_ = proc_lib:spawn_link(?MODULE, health_check_timeout_checker, [Pid, Name, Sleep]),
{ok, Pid}. {ok, Pid}.
create_checker(Name, Sleep) -> create_checker(Name, Sleep) ->
@ -55,6 +58,8 @@ delete_checker(Name) ->
end. end.
health_check(Name, SleepTime) -> health_check(Name, SleepTime) ->
receive
{Pid, start_health_check} ->
case emqx_resource:health_check(Name) of case emqx_resource:health_check(Name) of
ok -> ok ->
emqx_alarm:deactivate(Name); emqx_alarm:deactivate(Name);
@ -62,5 +67,21 @@ health_check(Name, SleepTime) ->
emqx_alarm:activate(Name, #{name => Name}, emqx_alarm:activate(Name, #{name => Name},
<<Name/binary, " health check failed">>) <<Name/binary, " health check failed">>)
end, end,
timer:sleep(SleepTime), Pid ! health_check_finish
end,
health_check(Name, SleepTime). health_check(Name, SleepTime).
health_check_timeout_checker(Pid, Name, SleepTime) ->
SelfPid = self(),
Pid ! {SelfPid, start_health_check},
receive
health_check_finish -> timer:sleep(SleepTime)
after 10000 ->
emqx_alarm:activate(Name, #{name => Name},
<<Name/binary, " health check timout">>),
emqx_resource:set_resource_status_stoped(Name),
receive
health_check_finish -> timer:sleep(SleepTime)
end
end,
health_check_timeout_checker(Pid, Name, SleepTime).

View File

@ -120,6 +120,9 @@ handle_call({stop, InstId}, _From, State) ->
handle_call({health_check, InstId}, _From, State) -> handle_call({health_check, InstId}, _From, State) ->
{reply, do_health_check(InstId), State}; {reply, do_health_check(InstId), State};
handle_call({set_resource_status_stoped, InstId}, _From, State) ->
{reply, do_set_resource_status_stoped(InstId), State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
logger:error("Received unexpected call: ~p", [Req]), logger:error("Received unexpected call: ~p", [Req]),
{reply, ignored, State}. {reply, ignored, State}.
@ -276,6 +279,14 @@ do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) ->
{error, Reason} {error, Reason}
end. end.
do_set_resource_status_stoped(InstId) ->
case emqx_resource_instance:lookup(InstId) of
{ok, #{id := InstId} = Data} ->
logger:error("health check for ~p failed: timeout", [InstId]),
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}});
Error -> {error, Error}
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% internal functions %% internal functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------