refactor(emqx_resource): change the status of emqx_resource to 'connected/connecting/disconnecting'

This commit is contained in:
EMQ-YangM 2022-02-21 14:33:19 +08:00
parent 521efac8ea
commit 376c9ee261
5 changed files with 29 additions and 29 deletions

View File

@ -25,7 +25,7 @@
mod := module(),
config := resource_config(),
state := resource_state(),
status := started | stopped | starting,
status := connected | disconnected | connecting,
metrics := emqx_plugin_libs_metrics:metrics()
}.
-type resource_group() :: binary().

View File

@ -60,7 +60,7 @@
-export([ restart/1 %% restart the instance.
, restart/2
, health_check/1 %% verify if the resource is working normally
, set_resource_status_stoped/1 %% set resource status to stopped
, set_resource_status_disconnected/1 %% set resource status to disconnected
, stop/1 %% stop the instance
, query/2 %% query the instance
, query/3 %% query the instance with after_query()
@ -190,13 +190,13 @@ query(InstId, Request) ->
-spec query(instance_id(), Request :: term(), after_query()) -> Result :: term().
query(InstId, Request, AfterQuery) ->
case get_instance(InstId) of
{ok, _Group, #{status := starting}} ->
query_error(starting, <<"cannot serve query when the resource "
"instance is still starting">>);
{ok, _Group, #{status := stopped}} ->
query_error(stopped, <<"cannot serve query when the resource "
"instance is stopped">>);
{ok, _Group, #{mod := Mod, state := ResourceState, status := started}} ->
{ok, _Group, #{status := connecting}} ->
query_error(connecting, <<"cannot serve query when the resource "
"instance is still connecting">>);
{ok, _Group, #{status := disconnected}} ->
query_error(disconnected, <<"cannot serve query when the resource "
"instance is disconnected">>);
{ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} ->
%% the resource state is readonly to Module:on_query/4
%% and the `after_query()` functions should be thread safe
ok = emqx_plugin_libs_metrics:inc(resource_metrics, InstId, matched),
@ -225,8 +225,8 @@ stop(InstId) ->
health_check(InstId) ->
call_instance(InstId, {health_check, InstId}).
set_resource_status_stoped(InstId) ->
call_instance(InstId, {set_resource_status_stoped, InstId}).
set_resource_status_disconnected(InstId) ->
call_instance(InstId, {set_resource_status_disconnected, InstId}).
-spec get_instance(instance_id()) -> {ok, resource_group(), resource_data()} | {error, Reason :: term()}.
get_instance(InstId) ->

View File

@ -83,7 +83,7 @@ health_check_timeout_checker(Pid, Name, SleepTime, Timeout) ->
after Timeout ->
emqx_alarm:activate(Name, #{name => Name},
<<Name/binary, " health check timeout">>),
emqx_resource:set_resource_status_stoped(Name),
emqx_resource:set_resource_status_disconnected(Name),
receive
health_check_finish -> timer:sleep(SleepTime)
end

View File

@ -126,8 +126,8 @@ handle_call({stop, InstId}, _From, State) ->
handle_call({health_check, InstId}, _From, State) ->
{reply, do_health_check(InstId), State};
handle_call({set_resource_status_stoped, InstId}, _From, State) ->
{reply, do_set_resource_status_stoped(InstId), State};
handle_call({set_resource_status_disconnected, InstId}, _From, State) ->
{reply, do_set_resource_status_disconnected(InstId), State};
handle_call(Req, _From, State) ->
logger:error("Received unexpected call: ~p", [Req]),
@ -159,8 +159,8 @@ code_change(_OldVsn, State, _Extra) ->
do_recreate(InstId, ResourceType, NewConfig, Opts) ->
case lookup(InstId) of
{ok, Group, #{mod := ResourceType, status := started} = Data} ->
%% If this resource is in use (status='started'), we should make sure
{ok, Group, #{mod := ResourceType, status := connected} = Data} ->
%% If this resource is in use (status='connected'), we should make sure
%% the new config is OK before removing the old one.
case do_create_dry_run(ResourceType, NewConfig) of
ok ->
@ -235,7 +235,7 @@ do_restart(InstId, Opts) ->
do_start(InstId, Group, ResourceType, Config, Opts) when is_binary(InstId) ->
InitData = #{id => InstId, mod => ResourceType, config => Config,
status => starting, state => undefined},
status => connecting, state => undefined},
%% The `emqx_resource:call_start/3` need the instance exist beforehand
ets:insert(emqx_resource_instance, {InstId, Group, InitData}),
case maps:get(async_create, Opts, false) of
@ -261,7 +261,7 @@ start_and_check(InstId, Group, ResourceType, Config, Opts, Data) ->
true -> create_default_checker(InstId, Opts)
end;
{error, Reason} ->
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => stopped}}),
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}}),
{error, Reason}
end.
@ -278,7 +278,7 @@ do_stop(_Group, #{state := undefined}) ->
do_stop(Group, #{id := InstId, mod := Mod, state := ResourceState} = Data) ->
_ = emqx_resource:call_stop(InstId, Mod, ResourceState),
_ = emqx_resource_health_check:delete_checker(InstId),
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => stopped}}),
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}}),
ok.
do_health_check(InstId) when is_binary(InstId) ->
@ -290,20 +290,20 @@ do_health_check(Group, #{id := InstId, mod := Mod, state := ResourceState0} = Da
case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of
{ok, ResourceState1} ->
ets:insert(emqx_resource_instance,
{InstId, Group, Data#{status => started, state => ResourceState1}}),
{InstId, Group, Data#{status => connected, state => ResourceState1}}),
ok;
{error, Reason, ResourceState1} ->
logger:error("health check for ~p failed: ~p", [InstId, Reason]),
ets:insert(emqx_resource_instance,
{InstId, Group, Data#{status => stopped, state => ResourceState1}}),
{InstId, Group, Data#{status => disconnected, state => ResourceState1}}),
{error, Reason}
end.
do_set_resource_status_stoped(InstId) ->
do_set_resource_status_disconnected(InstId) ->
case emqx_resource_instance:lookup(InstId) of
{ok, Group, #{id := InstId} = Data} ->
logger:error("health check for ~p failed: timeout", [InstId]),
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => stopped}});
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}});
Error -> {error, Error}
end.

View File

@ -107,7 +107,7 @@ t_create_remove_local(_) ->
?assert(is_process_alive(Pid)),
emqx_resource:set_resource_status_stoped(?ID),
emqx_resource:set_resource_status_disconnected(?ID),
emqx_resource:recreate_local(
?ID,
@ -170,12 +170,12 @@ t_healthy(_) ->
emqx_resource_health_check:create_checker(?ID, 15000, 10000),
#{pid := Pid} = emqx_resource:query(?ID, get_state),
timer:sleep(300),
emqx_resource:set_resource_status_stoped(?ID),
emqx_resource:set_resource_status_disconnected(?ID),
ok = emqx_resource:health_check(?ID),
?assertMatch(
[#{status := started}],
[#{status := connected}],
emqx_resource:list_instances_verbose()),
erlang:exit(Pid, shutdown),
@ -185,7 +185,7 @@ t_healthy(_) ->
emqx_resource:health_check(?ID)),
?assertMatch(
[#{status := stopped}],
[#{status := disconnected}],
emqx_resource:list_instances_verbose()),
ok = emqx_resource:remove_local(?ID).
@ -217,7 +217,7 @@ t_stop_start(_) ->
?assertNot(is_process_alive(Pid0)),
?assertMatch({error, {emqx_resource, #{reason := stopped}}},
?assertMatch({error, {emqx_resource, #{reason := disconnected}}},
emqx_resource:query(?ID, get_state)),
ok = emqx_resource:restart(?ID),
@ -253,7 +253,7 @@ t_stop_start_local(_) ->
?assertNot(is_process_alive(Pid0)),
?assertMatch({error, {emqx_resource, #{reason := stopped}}},
?assertMatch({error, {emqx_resource, #{reason := disconnected}}},
emqx_resource:query(?ID, get_state)),
ok = emqx_resource:restart(?ID),