Merge pull request #7140 from EMQ-YangM/tmp_change_status
refactor(emqx_resource): change the status of emqx_resource to 'conne…
This commit is contained in:
commit
47a4fa5732
|
@ -423,7 +423,7 @@ aggregate_metrics(AllMetrics) ->
|
|||
format_resp(#{id := Id, raw_config := RawConf,
|
||||
resource_data := #{status := Status, metrics := Metrics}}) ->
|
||||
{Type, BridgeName} = emqx_bridge:parse_bridge_id(Id),
|
||||
IsConnected = fun(started) -> connected; (_) -> disconnected end,
|
||||
IsConnected = fun(connected) -> connected; (_) -> disconnected end,
|
||||
RawConf#{
|
||||
id => Id,
|
||||
type => Type,
|
||||
|
|
|
@ -331,7 +331,7 @@ wait_for_resource_ready(InstId, 0) ->
|
|||
ct:fail(wait_resource_timeout);
|
||||
wait_for_resource_ready(InstId, Retry) ->
|
||||
case emqx_bridge:lookup(InstId) of
|
||||
{ok, #{resource_data := #{status := started}}} -> ok;
|
||||
{ok, #{resource_data := #{status := connected}}} -> ok;
|
||||
_ ->
|
||||
timer:sleep(100),
|
||||
wait_for_resource_ready(InstId, Retry-1)
|
||||
|
|
|
@ -21,7 +21,7 @@ An MySQL connector can be used as following:
|
|||
ssl => false,user => "root",verify => false},
|
||||
id => <<"mysql-abc">>,mod => emqx_connector_mysql,
|
||||
state => #{poolname => 'mysql-abc'},
|
||||
status => started}]
|
||||
status => connected}]
|
||||
(emqx@127.0.0.1)6> emqx_resource:query(<<"mysql-abc">>, {sql, <<"SELECT count(1)">>}).
|
||||
{ok,[<<"count(1)">>],[[1]]}
|
||||
```
|
||||
|
|
|
@ -695,7 +695,7 @@ wait_for_resource_ready(InstId, 0) ->
|
|||
ct:fail(wait_resource_timeout);
|
||||
wait_for_resource_ready(InstId, Retry) ->
|
||||
case emqx_bridge:lookup(InstId) of
|
||||
{ok, #{resource_data := #{status := started}}} -> ok;
|
||||
{ok, #{resource_data := #{status := connected}}} -> ok;
|
||||
_ ->
|
||||
timer:sleep(100),
|
||||
wait_for_resource_ready(InstId, Retry-1)
|
||||
|
|
|
@ -70,7 +70,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
|
|||
?MYSQL_RESOURCE_MOD,
|
||||
CheckedConfig
|
||||
),
|
||||
?assertEqual(InitialStatus, started),
|
||||
?assertEqual(InitialStatus, connected),
|
||||
% Instance should match the state and status of the just started resource
|
||||
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := InitialStatus}} = emqx_resource:get_instance(PoolName),
|
||||
?assertEqual(ok, emqx_resource:health_check(PoolName)),
|
||||
|
@ -82,7 +82,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
|
|||
% Resource will be listed still, but state will be changed and healthcheck will fail
|
||||
% as the worker no longer exists.
|
||||
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := StoppedStatus}} = emqx_resource:get_instance(PoolName),
|
||||
?assertEqual(StoppedStatus, stopped),
|
||||
?assertEqual(StoppedStatus, disconnected),
|
||||
?assertEqual({error,health_check_failed}, emqx_resource:health_check(PoolName)),
|
||||
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
|
||||
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
|
||||
|
|
|
@ -70,7 +70,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
|
|||
?PGSQL_RESOURCE_MOD,
|
||||
CheckedConfig
|
||||
),
|
||||
?assertEqual(InitialStatus, started),
|
||||
?assertEqual(InitialStatus, connected),
|
||||
% Instance should match the state and status of the just started resource
|
||||
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := InitialStatus}} = emqx_resource:get_instance(PoolName),
|
||||
?assertEqual(ok, emqx_resource:health_check(PoolName)),
|
||||
|
@ -81,7 +81,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
|
|||
% Resource will be listed still, but state will be changed and healthcheck will fail
|
||||
% as the worker no longer exists.
|
||||
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := StoppedStatus}} = emqx_resource:get_instance(PoolName),
|
||||
?assertEqual(StoppedStatus, stopped),
|
||||
?assertEqual(StoppedStatus, disconnected),
|
||||
?assertEqual({error,health_check_failed}, emqx_resource:health_check(PoolName)),
|
||||
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
|
||||
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
|
||||
|
|
|
@ -85,7 +85,7 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
|
|||
?REDIS_RESOURCE_MOD,
|
||||
CheckedConfig
|
||||
),
|
||||
?assertEqual(InitialStatus, started),
|
||||
?assertEqual(InitialStatus, connected),
|
||||
% Instance should match the state and status of the just started resource
|
||||
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := InitialStatus}} = emqx_resource:get_instance(PoolName),
|
||||
?assertEqual(ok, emqx_resource:health_check(PoolName)),
|
||||
|
@ -95,7 +95,7 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
|
|||
% Resource will be listed still, but state will be changed and healthcheck will fail
|
||||
% as the worker no longer exists.
|
||||
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := StoppedStatus}} = emqx_resource:get_instance(PoolName),
|
||||
?assertEqual(StoppedStatus, stopped),
|
||||
?assertEqual(StoppedStatus, disconnected),
|
||||
?assertEqual({error,health_check_failed}, emqx_resource:health_check(PoolName)),
|
||||
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
|
||||
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
mod := module(),
|
||||
config := resource_config(),
|
||||
state := resource_state(),
|
||||
status := started | stopped | starting,
|
||||
status := connected | disconnected | connecting,
|
||||
metrics := emqx_plugin_libs_metrics:metrics()
|
||||
}.
|
||||
-type resource_group() :: binary().
|
||||
|
|
|
@ -60,7 +60,7 @@
|
|||
-export([ restart/1 %% restart the instance.
|
||||
, restart/2
|
||||
, health_check/1 %% verify if the resource is working normally
|
||||
, set_resource_status_stoped/1 %% set resource status to stopped
|
||||
, set_resource_status_disconnected/1 %% set resource status to disconnected
|
||||
, stop/1 %% stop the instance
|
||||
, query/2 %% query the instance
|
||||
, query/3 %% query the instance with after_query()
|
||||
|
@ -190,13 +190,13 @@ query(InstId, Request) ->
|
|||
-spec query(instance_id(), Request :: term(), after_query()) -> Result :: term().
|
||||
query(InstId, Request, AfterQuery) ->
|
||||
case get_instance(InstId) of
|
||||
{ok, _Group, #{status := starting}} ->
|
||||
query_error(starting, <<"cannot serve query when the resource "
|
||||
"instance is still starting">>);
|
||||
{ok, _Group, #{status := stopped}} ->
|
||||
query_error(stopped, <<"cannot serve query when the resource "
|
||||
"instance is stopped">>);
|
||||
{ok, _Group, #{mod := Mod, state := ResourceState, status := started}} ->
|
||||
{ok, _Group, #{status := connecting}} ->
|
||||
query_error(connecting, <<"cannot serve query when the resource "
|
||||
"instance is still connecting">>);
|
||||
{ok, _Group, #{status := disconnected}} ->
|
||||
query_error(disconnected, <<"cannot serve query when the resource "
|
||||
"instance is disconnected">>);
|
||||
{ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} ->
|
||||
%% the resource state is readonly to Module:on_query/4
|
||||
%% and the `after_query()` functions should be thread safe
|
||||
ok = emqx_plugin_libs_metrics:inc(resource_metrics, InstId, matched),
|
||||
|
@ -225,8 +225,8 @@ stop(InstId) ->
|
|||
health_check(InstId) ->
|
||||
call_instance(InstId, {health_check, InstId}).
|
||||
|
||||
set_resource_status_stoped(InstId) ->
|
||||
call_instance(InstId, {set_resource_status_stoped, InstId}).
|
||||
set_resource_status_disconnected(InstId) ->
|
||||
call_instance(InstId, {set_resource_status_disconnected, InstId}).
|
||||
|
||||
-spec get_instance(instance_id()) -> {ok, resource_group(), resource_data()} | {error, Reason :: term()}.
|
||||
get_instance(InstId) ->
|
||||
|
|
|
@ -83,7 +83,7 @@ health_check_timeout_checker(Pid, Name, SleepTime, Timeout) ->
|
|||
after Timeout ->
|
||||
emqx_alarm:activate(Name, #{name => Name},
|
||||
<<Name/binary, " health check timeout">>),
|
||||
emqx_resource:set_resource_status_stoped(Name),
|
||||
emqx_resource:set_resource_status_disconnected(Name),
|
||||
receive
|
||||
health_check_finish -> timer:sleep(SleepTime)
|
||||
end
|
||||
|
|
|
@ -126,8 +126,8 @@ handle_call({stop, InstId}, _From, State) ->
|
|||
handle_call({health_check, InstId}, _From, State) ->
|
||||
{reply, do_health_check(InstId), State};
|
||||
|
||||
handle_call({set_resource_status_stoped, InstId}, _From, State) ->
|
||||
{reply, do_set_resource_status_stoped(InstId), State};
|
||||
handle_call({set_resource_status_disconnected, InstId}, _From, State) ->
|
||||
{reply, do_set_resource_status_disconnected(InstId), State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
logger:error("Received unexpected call: ~p", [Req]),
|
||||
|
@ -159,8 +159,8 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
|
||||
do_recreate(InstId, ResourceType, NewConfig, Opts) ->
|
||||
case lookup(InstId) of
|
||||
{ok, Group, #{mod := ResourceType, status := started} = Data} ->
|
||||
%% If this resource is in use (status='started'), we should make sure
|
||||
{ok, Group, #{mod := ResourceType, status := connected} = Data} ->
|
||||
%% If this resource is in use (status='connected'), we should make sure
|
||||
%% the new config is OK before removing the old one.
|
||||
case do_create_dry_run(ResourceType, NewConfig) of
|
||||
ok ->
|
||||
|
@ -235,7 +235,7 @@ do_restart(InstId, Opts) ->
|
|||
|
||||
do_start(InstId, Group, ResourceType, Config, Opts) when is_binary(InstId) ->
|
||||
InitData = #{id => InstId, mod => ResourceType, config => Config,
|
||||
status => starting, state => undefined},
|
||||
status => connecting, state => undefined},
|
||||
%% The `emqx_resource:call_start/3` need the instance exist beforehand
|
||||
ets:insert(emqx_resource_instance, {InstId, Group, InitData}),
|
||||
case maps:get(async_create, Opts, false) of
|
||||
|
@ -261,7 +261,7 @@ start_and_check(InstId, Group, ResourceType, Config, Opts, Data) ->
|
|||
true -> create_default_checker(InstId, Opts)
|
||||
end;
|
||||
{error, Reason} ->
|
||||
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => stopped}}),
|
||||
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}}),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
|
@ -278,7 +278,7 @@ do_stop(_Group, #{state := undefined}) ->
|
|||
do_stop(Group, #{id := InstId, mod := Mod, state := ResourceState} = Data) ->
|
||||
_ = emqx_resource:call_stop(InstId, Mod, ResourceState),
|
||||
_ = emqx_resource_health_check:delete_checker(InstId),
|
||||
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => stopped}}),
|
||||
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}}),
|
||||
ok.
|
||||
|
||||
do_health_check(InstId) when is_binary(InstId) ->
|
||||
|
@ -290,20 +290,20 @@ do_health_check(Group, #{id := InstId, mod := Mod, state := ResourceState0} = Da
|
|||
case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of
|
||||
{ok, ResourceState1} ->
|
||||
ets:insert(emqx_resource_instance,
|
||||
{InstId, Group, Data#{status => started, state => ResourceState1}}),
|
||||
{InstId, Group, Data#{status => connected, state => ResourceState1}}),
|
||||
ok;
|
||||
{error, Reason, ResourceState1} ->
|
||||
logger:error("health check for ~p failed: ~p", [InstId, Reason]),
|
||||
ets:insert(emqx_resource_instance,
|
||||
{InstId, Group, Data#{status => stopped, state => ResourceState1}}),
|
||||
{InstId, Group, Data#{status => disconnected, state => ResourceState1}}),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
do_set_resource_status_stoped(InstId) ->
|
||||
do_set_resource_status_disconnected(InstId) ->
|
||||
case emqx_resource_instance:lookup(InstId) of
|
||||
{ok, Group, #{id := InstId} = Data} ->
|
||||
logger:error("health check for ~p failed: timeout", [InstId]),
|
||||
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => stopped}});
|
||||
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}});
|
||||
Error -> {error, Error}
|
||||
end.
|
||||
|
||||
|
|
|
@ -107,7 +107,7 @@ t_create_remove_local(_) ->
|
|||
|
||||
?assert(is_process_alive(Pid)),
|
||||
|
||||
emqx_resource:set_resource_status_stoped(?ID),
|
||||
emqx_resource:set_resource_status_disconnected(?ID),
|
||||
|
||||
emqx_resource:recreate_local(
|
||||
?ID,
|
||||
|
@ -170,12 +170,12 @@ t_healthy(_) ->
|
|||
emqx_resource_health_check:create_checker(?ID, 15000, 10000),
|
||||
#{pid := Pid} = emqx_resource:query(?ID, get_state),
|
||||
timer:sleep(300),
|
||||
emqx_resource:set_resource_status_stoped(?ID),
|
||||
emqx_resource:set_resource_status_disconnected(?ID),
|
||||
|
||||
ok = emqx_resource:health_check(?ID),
|
||||
|
||||
?assertMatch(
|
||||
[#{status := started}],
|
||||
[#{status := connected}],
|
||||
emqx_resource:list_instances_verbose()),
|
||||
|
||||
erlang:exit(Pid, shutdown),
|
||||
|
@ -185,7 +185,7 @@ t_healthy(_) ->
|
|||
emqx_resource:health_check(?ID)),
|
||||
|
||||
?assertMatch(
|
||||
[#{status := stopped}],
|
||||
[#{status := disconnected}],
|
||||
emqx_resource:list_instances_verbose()),
|
||||
|
||||
ok = emqx_resource:remove_local(?ID).
|
||||
|
@ -217,7 +217,7 @@ t_stop_start(_) ->
|
|||
|
||||
?assertNot(is_process_alive(Pid0)),
|
||||
|
||||
?assertMatch({error, {emqx_resource, #{reason := stopped}}},
|
||||
?assertMatch({error, {emqx_resource, #{reason := disconnected}}},
|
||||
emqx_resource:query(?ID, get_state)),
|
||||
|
||||
ok = emqx_resource:restart(?ID),
|
||||
|
@ -253,7 +253,7 @@ t_stop_start_local(_) ->
|
|||
|
||||
?assertNot(is_process_alive(Pid0)),
|
||||
|
||||
?assertMatch({error, {emqx_resource, #{reason := stopped}}},
|
||||
?assertMatch({error, {emqx_resource, #{reason := disconnected}}},
|
||||
emqx_resource:query(?ID, get_state)),
|
||||
|
||||
ok = emqx_resource:restart(?ID),
|
||||
|
|
Loading…
Reference in New Issue