diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 566ddbb78..1f14e527d 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -60,6 +60,7 @@ -export([ restart/1 %% restart the instance. , restart/2 , health_check/1 %% verify if the resource is working normally + , set_resource_status_stoped/1 %% set resource status to stoped , stop/1 %% stop the instance , query/2 %% query the instance , query/3 %% query the instance with after_query() @@ -231,6 +232,9 @@ stop(InstId) -> health_check(InstId) -> call_instance(InstId, {health_check, InstId}). +set_resource_status_stoped(InstId) -> + call_instance(InstId, {set_resource_status_stoped, InstId}). + -spec get_instance(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}. get_instance(InstId) -> emqx_resource_instance:lookup(InstId). diff --git a/apps/emqx_resource/src/emqx_resource_health_check.erl b/apps/emqx_resource/src/emqx_resource_health_check.erl index 9e929c98b..f50b53e63 100644 --- a/apps/emqx_resource/src/emqx_resource_health_check.erl +++ b/apps/emqx_resource/src/emqx_resource_health_check.erl @@ -20,7 +20,9 @@ , delete_checker/1 ]). --export([health_check/2]). +-export([ start_health_check/2 + , health_check_timeout_checker/3 + ]). -define(SUP, emqx_resource_health_check_sup). -define(ID(NAME), {resource_health_check, NAME}). @@ -32,7 +34,7 @@ child_spec(Name, Sleep) -> shutdown => 5000, type => worker, modules => [?MODULE]}. start_link(Name, Sleep) -> - Pid = proc_lib:spawn_link(?MODULE, health_check, [Name, Sleep]), + Pid = proc_lib:spawn_link(?MODULE, start_health_check, [Name, Sleep]), {ok, Pid}. create_checker(Name, Sleep) -> @@ -54,13 +56,36 @@ delete_checker(Name) -> Error -> Error end. -health_check(Name, SleepTime) -> - case emqx_resource:health_check(Name) of - ok -> - emqx_alarm:deactivate(Name); - {error, _} -> - emqx_alarm:activate(Name, #{name => Name}, - <>) +start_health_check(Name, Sleep) -> + Pid = self(), + _ = proc_lib:spawn_link(?MODULE, health_check_timeout_checker, [Pid, Name, Sleep]), + health_check(Name). + +health_check(Name) -> + receive + {Pid, begin_health_check} -> + case emqx_resource:health_check(Name) of + ok -> + emqx_alarm:deactivate(Name); + {error, _} -> + emqx_alarm:activate(Name, #{name => Name}, + <>) + end, + Pid ! health_check_finish end, - timer:sleep(SleepTime), - health_check(Name, SleepTime). + health_check(Name). + +health_check_timeout_checker(Pid, Name, SleepTime) -> + SelfPid = self(), + Pid ! {SelfPid, begin_health_check}, + receive + health_check_finish -> timer:sleep(SleepTime) + after 10000 -> + emqx_alarm:activate(Name, #{name => Name}, + <>), + emqx_resource:set_resource_status_stoped(Name), + receive + health_check_finish -> timer:sleep(SleepTime) + end + end, + health_check_timeout_checker(Pid, Name, SleepTime). diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index eea53dcee..6308a5d60 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -120,6 +120,9 @@ handle_call({stop, InstId}, _From, State) -> handle_call({health_check, InstId}, _From, State) -> {reply, do_health_check(InstId), State}; +handle_call({set_resource_status_stoped, InstId}, _From, State) -> + {reply, do_set_resource_status_stoped(InstId), State}; + handle_call(Req, _From, State) -> logger:error("Received unexpected call: ~p", [Req]), {reply, ignored, State}. @@ -280,6 +283,14 @@ do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) -> {error, Reason} end. +do_set_resource_status_stoped(InstId) -> + case emqx_resource_instance:lookup(InstId) of + {ok, #{id := InstId} = Data} -> + logger:error("health check for ~p failed: timeout", [InstId]), + ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}); + Error -> {error, Error} + end. + %%------------------------------------------------------------------------------ %% internal functions %%------------------------------------------------------------------------------ diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 59801bb30..6559f769c 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -104,8 +104,9 @@ t_healthy(_) -> {ok, _} = emqx_resource:create_local( ?ID, ?TEST_RESOURCE, - #{name => test_resource}), - + #{name => <<"test_resource">>}, #{async_create => true}), + timer:sleep(300), + emqx_resource_health_check:create_checker(?ID, 15000), #{pid := Pid} = emqx_resource:query(?ID, get_state), ok = emqx_resource:health_check(?ID),