From fd7e0c800a05052f5b718c05b3301409ab4f158a Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Wed, 19 Jan 2022 16:25:54 +0800 Subject: [PATCH 1/3] feat(emqx_resource_health_check): add timeout to single health_check --- apps/emqx_resource/src/emqx_resource.erl | 4 ++ .../src/emqx_resource_health_check.erl | 37 +++++++++++++++---- .../src/emqx_resource_instance.erl | 11 ++++++ 3 files changed, 44 insertions(+), 8 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index d7066ef87..750b7cde5 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -60,6 +60,7 @@ -export([ restart/1 %% restart the instance. , restart/2 , health_check/1 %% verify if the resource is working normally + , set_resource_status_stoped/1 %% set resource status to stoped , stop/1 %% stop the instance , query/2 %% query the instance , query/3 %% query the instance with after_query() @@ -231,6 +232,9 @@ stop(InstId) -> health_check(InstId) -> call_instance(InstId, {health_check, InstId}). +set_resource_status_stoped(InstId) -> + call_instance(InstId, {set_resource_status_stoped, InstId}). + -spec get_instance(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}. get_instance(InstId) -> emqx_resource_instance:lookup(InstId). diff --git a/apps/emqx_resource/src/emqx_resource_health_check.erl b/apps/emqx_resource/src/emqx_resource_health_check.erl index 9e929c98b..b07e63609 100644 --- a/apps/emqx_resource/src/emqx_resource_health_check.erl +++ b/apps/emqx_resource/src/emqx_resource_health_check.erl @@ -20,7 +20,9 @@ , delete_checker/1 ]). --export([health_check/2]). +-export([ health_check/2 + , health_check_timeout_checker/3 + ]). -define(SUP, emqx_resource_health_check_sup). -define(ID(NAME), {resource_health_check, NAME}). @@ -33,6 +35,7 @@ child_spec(Name, Sleep) -> start_link(Name, Sleep) -> Pid = proc_lib:spawn_link(?MODULE, health_check, [Name, Sleep]), + _ = proc_lib:spawn_link(?MODULE, health_check_timeout_checker, [Pid, Name, Sleep]), {ok, Pid}. create_checker(Name, Sleep) -> @@ -55,12 +58,30 @@ delete_checker(Name) -> end. health_check(Name, SleepTime) -> - case emqx_resource:health_check(Name) of - ok -> - emqx_alarm:deactivate(Name); - {error, _} -> - emqx_alarm:activate(Name, #{name => Name}, - <>) + receive + {Pid, start_health_check} -> + case emqx_resource:health_check(Name) of + ok -> + emqx_alarm:deactivate(Name); + {error, _} -> + emqx_alarm:activate(Name, #{name => Name}, + <>) + end, + Pid ! health_check_finish end, - timer:sleep(SleepTime), health_check(Name, SleepTime). + +health_check_timeout_checker(Pid, Name, SleepTime) -> + SelfPid = self(), + Pid ! {SelfPid, start_health_check}, + receive + health_check_finish -> timer:sleep(SleepTime) + after 10000 -> + emqx_alarm:activate(Name, #{name => Name}, + <>), + emqx_resource:set_resource_status_stoped(Name), + receive + health_check_finish -> timer:sleep(SleepTime) + end + end, + health_check_timeout_checker(Pid, Name, SleepTime). diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 74429fcc8..a5d7da1a1 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -120,6 +120,9 @@ handle_call({stop, InstId}, _From, State) -> handle_call({health_check, InstId}, _From, State) -> {reply, do_health_check(InstId), State}; +handle_call({set_resource_status_stoped, InstId}, _From, State) -> + {reply, do_set_resource_status_stoped(InstId), State}; + handle_call(Req, _From, State) -> logger:error("Received unexpected call: ~p", [Req]), {reply, ignored, State}. @@ -276,6 +279,14 @@ do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) -> {error, Reason} end. +do_set_resource_status_stoped(InstId) -> + case emqx_resource_instance:lookup(InstId) of + {ok, #{id := InstId} = Data} -> + logger:error("health check for ~p failed: timeout", [InstId]), + ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}); + Error -> {error, Error} + end. + %%------------------------------------------------------------------------------ %% internal functions %%------------------------------------------------------------------------------ From 8506bed48970fccfb338a8c9067378b8476b29f3 Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Mon, 24 Jan 2022 10:18:18 +0800 Subject: [PATCH 2/3] fix(emqx_resource_health_check): link heath_check, health_check_timeout_checker --- .../src/emqx_resource_health_check.erl | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_health_check.erl b/apps/emqx_resource/src/emqx_resource_health_check.erl index b07e63609..f50b53e63 100644 --- a/apps/emqx_resource/src/emqx_resource_health_check.erl +++ b/apps/emqx_resource/src/emqx_resource_health_check.erl @@ -20,7 +20,7 @@ , delete_checker/1 ]). --export([ health_check/2 +-export([ start_health_check/2 , health_check_timeout_checker/3 ]). @@ -34,8 +34,7 @@ child_spec(Name, Sleep) -> shutdown => 5000, type => worker, modules => [?MODULE]}. start_link(Name, Sleep) -> - Pid = proc_lib:spawn_link(?MODULE, health_check, [Name, Sleep]), - _ = proc_lib:spawn_link(?MODULE, health_check_timeout_checker, [Pid, Name, Sleep]), + Pid = proc_lib:spawn_link(?MODULE, start_health_check, [Name, Sleep]), {ok, Pid}. create_checker(Name, Sleep) -> @@ -57,9 +56,14 @@ delete_checker(Name) -> Error -> Error end. -health_check(Name, SleepTime) -> +start_health_check(Name, Sleep) -> + Pid = self(), + _ = proc_lib:spawn_link(?MODULE, health_check_timeout_checker, [Pid, Name, Sleep]), + health_check(Name). + +health_check(Name) -> receive - {Pid, start_health_check} -> + {Pid, begin_health_check} -> case emqx_resource:health_check(Name) of ok -> emqx_alarm:deactivate(Name); @@ -69,11 +73,11 @@ health_check(Name, SleepTime) -> end, Pid ! health_check_finish end, - health_check(Name, SleepTime). + health_check(Name). health_check_timeout_checker(Pid, Name, SleepTime) -> SelfPid = self(), - Pid ! {SelfPid, start_health_check}, + Pid ! {SelfPid, begin_health_check}, receive health_check_finish -> timer:sleep(SleepTime) after 10000 -> From c870a2c78cc077752d2fd0544ac2789062683e51 Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Mon, 24 Jan 2022 14:24:31 +0800 Subject: [PATCH 3/3] test(emqx_resource_health_check): add async_create to create_local --- apps/emqx_resource/test/emqx_resource_SUITE.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 31f62f8aa..80c32b327 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -105,8 +105,9 @@ t_healthy(_) -> {ok, _} = emqx_resource:create_local( ?ID, ?TEST_RESOURCE, - #{name => <<"test_resource">>}), - + #{name => <<"test_resource">>}, #{async_create => true}), + timer:sleep(300), + emqx_resource_health_check:create_checker(?ID, 15000), #{pid := Pid} = emqx_resource:query(?ID, get_state), ok = emqx_resource:health_check(?ID),