From 376c9ee26198a7e2e4481f361aa3a0939fef08d8 Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Mon, 21 Feb 2022 14:33:19 +0800 Subject: [PATCH] refactor(emqx_resource): change the status of emqx_resource to 'connected/connecting/disconnecting' --- apps/emqx_resource/include/emqx_resource.hrl | 2 +- apps/emqx_resource/src/emqx_resource.erl | 20 ++++++++--------- .../src/emqx_resource_health_check.erl | 2 +- .../src/emqx_resource_instance.erl | 22 +++++++++---------- .../test/emqx_resource_SUITE.erl | 12 +++++----- 5 files changed, 29 insertions(+), 29 deletions(-) diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 0ea92d993..78aae4313 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -25,7 +25,7 @@ mod := module(), config := resource_config(), state := resource_state(), - status := started | stopped | starting, + status := connected | disconnected | connecting, metrics := emqx_plugin_libs_metrics:metrics() }. -type resource_group() :: binary(). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index e6c33718f..770ba8fa1 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -60,7 +60,7 @@ -export([ restart/1 %% restart the instance. , restart/2 , health_check/1 %% verify if the resource is working normally - , set_resource_status_stoped/1 %% set resource status to stopped + , set_resource_status_disconnected/1 %% set resource status to disconnected , stop/1 %% stop the instance , query/2 %% query the instance , query/3 %% query the instance with after_query() @@ -190,13 +190,13 @@ query(InstId, Request) -> -spec query(instance_id(), Request :: term(), after_query()) -> Result :: term(). query(InstId, Request, AfterQuery) -> case get_instance(InstId) of - {ok, _Group, #{status := starting}} -> - query_error(starting, <<"cannot serve query when the resource " - "instance is still starting">>); - {ok, _Group, #{status := stopped}} -> - query_error(stopped, <<"cannot serve query when the resource " - "instance is stopped">>); - {ok, _Group, #{mod := Mod, state := ResourceState, status := started}} -> + {ok, _Group, #{status := connecting}} -> + query_error(connecting, <<"cannot serve query when the resource " + "instance is still connecting">>); + {ok, _Group, #{status := disconnected}} -> + query_error(disconnected, <<"cannot serve query when the resource " + "instance is disconnected">>); + {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} -> %% the resource state is readonly to Module:on_query/4 %% and the `after_query()` functions should be thread safe ok = emqx_plugin_libs_metrics:inc(resource_metrics, InstId, matched), @@ -225,8 +225,8 @@ stop(InstId) -> health_check(InstId) -> call_instance(InstId, {health_check, InstId}). -set_resource_status_stoped(InstId) -> - call_instance(InstId, {set_resource_status_stoped, InstId}). +set_resource_status_disconnected(InstId) -> + call_instance(InstId, {set_resource_status_disconnected, InstId}). -spec get_instance(instance_id()) -> {ok, resource_group(), resource_data()} | {error, Reason :: term()}. get_instance(InstId) -> diff --git a/apps/emqx_resource/src/emqx_resource_health_check.erl b/apps/emqx_resource/src/emqx_resource_health_check.erl index 5791463ea..801c9d02f 100644 --- a/apps/emqx_resource/src/emqx_resource_health_check.erl +++ b/apps/emqx_resource/src/emqx_resource_health_check.erl @@ -83,7 +83,7 @@ health_check_timeout_checker(Pid, Name, SleepTime, Timeout) -> after Timeout -> emqx_alarm:activate(Name, #{name => Name}, <>), - emqx_resource:set_resource_status_stoped(Name), + emqx_resource:set_resource_status_disconnected(Name), receive health_check_finish -> timer:sleep(SleepTime) end diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 1b0effe64..115f8426a 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -126,8 +126,8 @@ handle_call({stop, InstId}, _From, State) -> handle_call({health_check, InstId}, _From, State) -> {reply, do_health_check(InstId), State}; -handle_call({set_resource_status_stoped, InstId}, _From, State) -> - {reply, do_set_resource_status_stoped(InstId), State}; +handle_call({set_resource_status_disconnected, InstId}, _From, State) -> + {reply, do_set_resource_status_disconnected(InstId), State}; handle_call(Req, _From, State) -> logger:error("Received unexpected call: ~p", [Req]), @@ -159,8 +159,8 @@ code_change(_OldVsn, State, _Extra) -> do_recreate(InstId, ResourceType, NewConfig, Opts) -> case lookup(InstId) of - {ok, Group, #{mod := ResourceType, status := started} = Data} -> - %% If this resource is in use (status='started'), we should make sure + {ok, Group, #{mod := ResourceType, status := connected} = Data} -> + %% If this resource is in use (status='connected'), we should make sure %% the new config is OK before removing the old one. case do_create_dry_run(ResourceType, NewConfig) of ok -> @@ -235,7 +235,7 @@ do_restart(InstId, Opts) -> do_start(InstId, Group, ResourceType, Config, Opts) when is_binary(InstId) -> InitData = #{id => InstId, mod => ResourceType, config => Config, - status => starting, state => undefined}, + status => connecting, state => undefined}, %% The `emqx_resource:call_start/3` need the instance exist beforehand ets:insert(emqx_resource_instance, {InstId, Group, InitData}), case maps:get(async_create, Opts, false) of @@ -261,7 +261,7 @@ start_and_check(InstId, Group, ResourceType, Config, Opts, Data) -> true -> create_default_checker(InstId, Opts) end; {error, Reason} -> - ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => stopped}}), + ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}}), {error, Reason} end. @@ -278,7 +278,7 @@ do_stop(_Group, #{state := undefined}) -> do_stop(Group, #{id := InstId, mod := Mod, state := ResourceState} = Data) -> _ = emqx_resource:call_stop(InstId, Mod, ResourceState), _ = emqx_resource_health_check:delete_checker(InstId), - ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => stopped}}), + ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}}), ok. do_health_check(InstId) when is_binary(InstId) -> @@ -290,20 +290,20 @@ do_health_check(Group, #{id := InstId, mod := Mod, state := ResourceState0} = Da case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of {ok, ResourceState1} -> ets:insert(emqx_resource_instance, - {InstId, Group, Data#{status => started, state => ResourceState1}}), + {InstId, Group, Data#{status => connected, state => ResourceState1}}), ok; {error, Reason, ResourceState1} -> logger:error("health check for ~p failed: ~p", [InstId, Reason]), ets:insert(emqx_resource_instance, - {InstId, Group, Data#{status => stopped, state => ResourceState1}}), + {InstId, Group, Data#{status => disconnected, state => ResourceState1}}), {error, Reason} end. -do_set_resource_status_stoped(InstId) -> +do_set_resource_status_disconnected(InstId) -> case emqx_resource_instance:lookup(InstId) of {ok, Group, #{id := InstId} = Data} -> logger:error("health check for ~p failed: timeout", [InstId]), - ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => stopped}}); + ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}}); Error -> {error, Error} end. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index eeaa6a501..f0a2efcf8 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -107,7 +107,7 @@ t_create_remove_local(_) -> ?assert(is_process_alive(Pid)), - emqx_resource:set_resource_status_stoped(?ID), + emqx_resource:set_resource_status_disconnected(?ID), emqx_resource:recreate_local( ?ID, @@ -170,12 +170,12 @@ t_healthy(_) -> emqx_resource_health_check:create_checker(?ID, 15000, 10000), #{pid := Pid} = emqx_resource:query(?ID, get_state), timer:sleep(300), - emqx_resource:set_resource_status_stoped(?ID), + emqx_resource:set_resource_status_disconnected(?ID), ok = emqx_resource:health_check(?ID), ?assertMatch( - [#{status := started}], + [#{status := connected}], emqx_resource:list_instances_verbose()), erlang:exit(Pid, shutdown), @@ -185,7 +185,7 @@ t_healthy(_) -> emqx_resource:health_check(?ID)), ?assertMatch( - [#{status := stopped}], + [#{status := disconnected}], emqx_resource:list_instances_verbose()), ok = emqx_resource:remove_local(?ID). @@ -217,7 +217,7 @@ t_stop_start(_) -> ?assertNot(is_process_alive(Pid0)), - ?assertMatch({error, {emqx_resource, #{reason := stopped}}}, + ?assertMatch({error, {emqx_resource, #{reason := disconnected}}}, emqx_resource:query(?ID, get_state)), ok = emqx_resource:restart(?ID), @@ -253,7 +253,7 @@ t_stop_start_local(_) -> ?assertNot(is_process_alive(Pid0)), - ?assertMatch({error, {emqx_resource, #{reason := stopped}}}, + ?assertMatch({error, {emqx_resource, #{reason := disconnected}}}, emqx_resource:query(?ID, get_state)), ok = emqx_resource:restart(?ID),