feat(emqx_resource_health_check): add timeout params to health_check_timeout_checker
This commit is contained in:
parent
127384a9ae
commit
cb9f14f658
|
@ -15,38 +15,38 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_resource_health_check).
|
-module(emqx_resource_health_check).
|
||||||
|
|
||||||
-export([ start_link/2
|
-export([ start_link/3
|
||||||
, create_checker/2
|
, create_checker/3
|
||||||
, delete_checker/1
|
, delete_checker/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ start_health_check/2
|
-export([ start_health_check/3
|
||||||
, health_check_timeout_checker/3
|
, health_check_timeout_checker/4
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(SUP, emqx_resource_health_check_sup).
|
-define(SUP, emqx_resource_health_check_sup).
|
||||||
-define(ID(NAME), {resource_health_check, NAME}).
|
-define(ID(NAME), {resource_health_check, NAME}).
|
||||||
|
|
||||||
child_spec(Name, Sleep) ->
|
child_spec(Name, Sleep, Timeout) ->
|
||||||
#{id => ?ID(Name),
|
#{id => ?ID(Name),
|
||||||
start => {?MODULE, start_link, [Name, Sleep]},
|
start => {?MODULE, start_link, [Name, Sleep, Timeout]},
|
||||||
restart => transient,
|
restart => transient,
|
||||||
shutdown => 5000, type => worker, modules => [?MODULE]}.
|
shutdown => 5000, type => worker, modules => [?MODULE]}.
|
||||||
|
|
||||||
start_link(Name, Sleep) ->
|
start_link(Name, Sleep, Timeout) ->
|
||||||
Pid = proc_lib:spawn_link(?MODULE, start_health_check, [Name, Sleep]),
|
Pid = proc_lib:spawn_link(?MODULE, start_health_check, [Name, Sleep, Timeout]),
|
||||||
{ok, Pid}.
|
{ok, Pid}.
|
||||||
|
|
||||||
create_checker(Name, Sleep) ->
|
create_checker(Name, Sleep, Timeout) ->
|
||||||
create_checker(Name, Sleep, false).
|
create_checker(Name, Sleep, false, Timeout).
|
||||||
|
|
||||||
create_checker(Name, Sleep, Retry) ->
|
create_checker(Name, Sleep, Retry, Timeout) ->
|
||||||
case supervisor:start_child(?SUP, child_spec(Name, Sleep)) of
|
case supervisor:start_child(?SUP, child_spec(Name, Sleep, Timeout)) of
|
||||||
{ok, _} -> ok;
|
{ok, _} -> ok;
|
||||||
{error, already_present} -> ok;
|
{error, already_present} -> ok;
|
||||||
{error, {already_started, _}} when Retry == false ->
|
{error, {already_started, _}} when Retry == false ->
|
||||||
ok = delete_checker(Name),
|
ok = delete_checker(Name),
|
||||||
create_checker(Name, Sleep, true);
|
create_checker(Name, Sleep, true, Timeout);
|
||||||
Error -> Error
|
Error -> Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -56,9 +56,9 @@ delete_checker(Name) ->
|
||||||
Error -> Error
|
Error -> Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
start_health_check(Name, Sleep) ->
|
start_health_check(Name, Sleep, Timeout) ->
|
||||||
Pid = self(),
|
Pid = self(),
|
||||||
_ = proc_lib:spawn_link(?MODULE, health_check_timeout_checker, [Pid, Name, Sleep]),
|
_ = proc_lib:spawn_link(?MODULE, health_check_timeout_checker, [Pid, Name, Sleep, Timeout]),
|
||||||
health_check(Name).
|
health_check(Name).
|
||||||
|
|
||||||
health_check(Name) ->
|
health_check(Name) ->
|
||||||
|
@ -75,12 +75,12 @@ health_check(Name) ->
|
||||||
end,
|
end,
|
||||||
health_check(Name).
|
health_check(Name).
|
||||||
|
|
||||||
health_check_timeout_checker(Pid, Name, SleepTime) ->
|
health_check_timeout_checker(Pid, Name, SleepTime, Timeout) ->
|
||||||
SelfPid = self(),
|
SelfPid = self(),
|
||||||
Pid ! {SelfPid, begin_health_check},
|
Pid ! {SelfPid, begin_health_check},
|
||||||
receive
|
receive
|
||||||
health_check_finish -> timer:sleep(SleepTime)
|
health_check_finish -> timer:sleep(SleepTime)
|
||||||
after 10000 ->
|
after Timeout ->
|
||||||
emqx_alarm:activate(Name, #{name => Name},
|
emqx_alarm:activate(Name, #{name => Name},
|
||||||
<<Name/binary, " health check timout">>),
|
<<Name/binary, " health check timout">>),
|
||||||
emqx_resource:set_resource_status_stoped(Name),
|
emqx_resource:set_resource_status_stoped(Name),
|
||||||
|
@ -88,4 +88,4 @@ health_check_timeout_checker(Pid, Name, SleepTime) ->
|
||||||
health_check_finish -> timer:sleep(SleepTime)
|
health_check_finish -> timer:sleep(SleepTime)
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
health_check_timeout_checker(Pid, Name, SleepTime).
|
health_check_timeout_checker(Pid, Name, SleepTime, Timeout).
|
||||||
|
|
|
@ -249,7 +249,8 @@ start_and_check(InstId, ResourceType, Config, Opts, Data) ->
|
||||||
case maps:get(async_create, Opts, false) of
|
case maps:get(async_create, Opts, false) of
|
||||||
false -> do_health_check(Data2);
|
false -> do_health_check(Data2);
|
||||||
true -> emqx_resource_health_check:create_checker(InstId,
|
true -> emqx_resource_health_check:create_checker(InstId,
|
||||||
maps:get(health_check_interval, Opts, 15000))
|
maps:get(health_check_interval, Opts, 15000),
|
||||||
|
maps:get(health_check_timeout, Opts, 10000))
|
||||||
end;
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
|
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
|
||||||
|
|
|
@ -148,7 +148,7 @@ t_healthy(_) ->
|
||||||
#{async_create => true}),
|
#{async_create => true}),
|
||||||
timer:sleep(300),
|
timer:sleep(300),
|
||||||
|
|
||||||
emqx_resource_health_check:create_checker(?ID, 15000),
|
emqx_resource_health_check:create_checker(?ID, 15000, 10000),
|
||||||
#{pid := Pid} = emqx_resource:query(?ID, get_state),
|
#{pid := Pid} = emqx_resource:query(?ID, get_state),
|
||||||
timer:sleep(300),
|
timer:sleep(300),
|
||||||
emqx_resource:set_resource_status_stoped(?ID),
|
emqx_resource:set_resource_status_stoped(?ID),
|
||||||
|
|
Loading…
Reference in New Issue