refactor(resource): improve the process starting/stopping resource instances
This commit is contained in:
parent
95340b7baa
commit
2277b75b2f
|
@ -58,6 +58,7 @@
|
|||
%% Calls to the callback module with current resource state
|
||||
%% They also save the state after the call finished (except query/2,3).
|
||||
-export([ restart/1 %% restart the instance.
|
||||
, restart/2
|
||||
, health_check/1 %% verify if the resource is working normally
|
||||
, stop/1 %% stop the instance
|
||||
, query/2 %% query the instance
|
||||
|
@ -68,7 +69,6 @@
|
|||
-export([ call_start/3 %% start the instance
|
||||
, call_health_check/3 %% verify if the resource is working normally
|
||||
, call_stop/3 %% stop the instance
|
||||
, call_config_merge/4 %% merge the config when updating
|
||||
, call_jsonify/2
|
||||
]).
|
||||
|
||||
|
@ -86,12 +86,9 @@
|
|||
|
||||
-optional_callbacks([ on_query/4
|
||||
, on_health_check/2
|
||||
, on_config_merge/3
|
||||
, on_jsonify/1
|
||||
]).
|
||||
|
||||
-callback on_config_merge(resource_config(), resource_config(), term()) -> resource_config().
|
||||
|
||||
-callback on_jsonify(resource_config()) -> jsx:json_term().
|
||||
|
||||
%% when calling emqx_resource:start/1
|
||||
|
@ -169,18 +166,17 @@ create_dry_run(ResourceType, Config) ->
|
|||
-spec create_dry_run_local(resource_type(), resource_config()) ->
|
||||
ok | {error, Reason :: term()}.
|
||||
create_dry_run_local(ResourceType, Config) ->
|
||||
InstId = emqx_resource_instance:make_test_id(),
|
||||
call_instance(InstId, {create_dry_run, InstId, ResourceType, Config}).
|
||||
call_instance(<<?TEST_ID_PREFIX>>, {create_dry_run, ResourceType, Config}).
|
||||
|
||||
-spec recreate(instance_id(), resource_type(), resource_config(), term()) ->
|
||||
-spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) ->
|
||||
{ok, resource_data()} | {error, Reason :: term()}.
|
||||
recreate(InstId, ResourceType, Config, Params) ->
|
||||
cluster_call(recreate_local, [InstId, ResourceType, Config, Params]).
|
||||
recreate(InstId, ResourceType, Config, Opts) ->
|
||||
cluster_call(recreate_local, [InstId, ResourceType, Config, Opts]).
|
||||
|
||||
-spec recreate_local(instance_id(), resource_type(), resource_config(), term()) ->
|
||||
-spec recreate_local(instance_id(), resource_type(), resource_config(), create_opts()) ->
|
||||
{ok, resource_data()} | {error, Reason :: term()}.
|
||||
recreate_local(InstId, ResourceType, Config, Params) ->
|
||||
call_instance(InstId, {recreate, InstId, ResourceType, Config, Params}).
|
||||
recreate_local(InstId, ResourceType, Config, Opts) ->
|
||||
call_instance(InstId, {recreate, InstId, ResourceType, Config, Opts}).
|
||||
|
||||
-spec remove(instance_id()) -> ok | {error, Reason :: term()}.
|
||||
remove(InstId) ->
|
||||
|
@ -216,7 +212,11 @@ query(InstId, Request, AfterQuery) ->
|
|||
|
||||
-spec restart(instance_id()) -> ok | {error, Reason :: term()}.
|
||||
restart(InstId) ->
|
||||
call_instance(InstId, {restart, InstId}).
|
||||
restart(InstId, #{}).
|
||||
|
||||
-spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
|
||||
restart(InstId, Opts) ->
|
||||
call_instance(InstId, {restart, InstId, Opts}).
|
||||
|
||||
-spec stop(instance_id()) -> ok | {error, Reason :: term()}.
|
||||
stop(InstId) ->
|
||||
|
@ -276,14 +276,6 @@ call_health_check(InstId, Mod, ResourceState) ->
|
|||
call_stop(InstId, Mod, ResourceState) ->
|
||||
?SAFE_CALL(Mod:on_stop(InstId, ResourceState)).
|
||||
|
||||
-spec call_config_merge(module(), resource_config(), resource_config(), term()) ->
|
||||
resource_config().
|
||||
call_config_merge(Mod, OldConfig, NewConfig, Params) ->
|
||||
case erlang:function_exported(Mod, on_config_merge, 3) of
|
||||
true -> ?SAFE_CALL(Mod:on_config_merge(OldConfig, NewConfig, Params));
|
||||
false -> NewConfig
|
||||
end.
|
||||
|
||||
-spec call_jsonify(module(), resource_config()) -> jsx:json_term().
|
||||
call_jsonify(Mod, Config) ->
|
||||
case erlang:function_exported(Mod, on_jsonify, 1) of
|
||||
|
@ -330,17 +322,17 @@ check_and_create_local(InstId, ResourceType, RawConfig, Opts) ->
|
|||
check_and_do(ResourceType, RawConfig,
|
||||
fun(InstConf) -> create_local(InstId, ResourceType, InstConf, Opts) end).
|
||||
|
||||
-spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), term()) ->
|
||||
-spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), create_opts()) ->
|
||||
{ok, resource_data()} | {error, term()}.
|
||||
check_and_recreate(InstId, ResourceType, RawConfig, Params) ->
|
||||
check_and_recreate(InstId, ResourceType, RawConfig, Opts) ->
|
||||
check_and_do(ResourceType, RawConfig,
|
||||
fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Params) end).
|
||||
fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Opts) end).
|
||||
|
||||
-spec check_and_recreate_local(instance_id(), resource_type(), raw_resource_config(), term()) ->
|
||||
-spec check_and_recreate_local(instance_id(), resource_type(), raw_resource_config(), create_opts()) ->
|
||||
{ok, resource_data()} | {error, term()}.
|
||||
check_and_recreate_local(InstId, ResourceType, RawConfig, Params) ->
|
||||
check_and_recreate_local(InstId, ResourceType, RawConfig, Opts) ->
|
||||
check_and_do(ResourceType, RawConfig,
|
||||
fun(InstConf) -> recreate_local(InstId, ResourceType, InstConf, Params) end).
|
||||
fun(InstConf) -> recreate_local(InstId, ResourceType, InstConf, Opts) end).
|
||||
|
||||
check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
|
||||
case check_config(ResourceType, RawConfig) of
|
||||
|
|
|
@ -15,18 +15,10 @@
|
|||
%%--------------------------------------------------------------------
|
||||
-module(emqx_resource_health_check).
|
||||
|
||||
-export([child_spec/2]).
|
||||
|
||||
-export([start_link/2]).
|
||||
|
||||
-export([health_check/2]).
|
||||
|
||||
child_spec(Name, Sleep) ->
|
||||
#{id => {health_check, Name},
|
||||
start => {?MODULE, start_link, [Name, Sleep]},
|
||||
restart => transient,
|
||||
shutdown => 5000, type => worker, modules => [?MODULE]}.
|
||||
|
||||
start_link(Name, Sleep) ->
|
||||
Pid = proc_lib:spawn_link(?MODULE, health_check, [Name, Sleep]),
|
||||
{ok, Pid}.
|
||||
|
@ -38,6 +30,6 @@ health_check(Name, SleepTime) ->
|
|||
emqx_alarm:deactivate(Name);
|
||||
{error, _} ->
|
||||
emqx_alarm:activate(Name, #{name => Name},
|
||||
<<Name/binary, " health check failed">>)
|
||||
<<Name/binary, " health check failed">>)
|
||||
end,
|
||||
health_check(Name, SleepTime).
|
|
@ -20,8 +20,17 @@
|
|||
-export([start_link/0]).
|
||||
|
||||
-export([init/1,
|
||||
create_health_check_process/2,
|
||||
delete_health_check_process/1]).
|
||||
create_checker/2,
|
||||
delete_checker/1]).
|
||||
|
||||
-define(HEALTH_CHECK_MOD, emqx_resource_health_check).
|
||||
-define(ID(NAME), {resource_health_check, NAME}).
|
||||
|
||||
child_spec(Name, Sleep) ->
|
||||
#{id => ?ID(Name),
|
||||
start => {?HEALTH_CHECK_MOD, start_link, [Name, Sleep]},
|
||||
restart => transient,
|
||||
shutdown => 5000, type => worker, modules => [?HEALTH_CHECK_MOD]}.
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
@ -30,11 +39,21 @@ init([]) ->
|
|||
SupFlags = #{strategy => one_for_one, intensity => 10, period => 10},
|
||||
{ok, {SupFlags, []}}.
|
||||
|
||||
create_health_check_process(Name, Sleep) ->
|
||||
supervisor:start_child(emqx_resource_health_check_sup,
|
||||
emqx_resource_health_check:child_spec(Name, Sleep)).
|
||||
create_checker(Name, Sleep) ->
|
||||
case supervisor:start_child(?MODULE, child_spec(Name, Sleep)) of
|
||||
{ok, _} -> ok;
|
||||
{error, already_present} -> ok;
|
||||
{error, {already_started, _}} -> ok;
|
||||
Error -> Error
|
||||
end.
|
||||
|
||||
delete_health_check_process(Name) ->
|
||||
_ = supervisor:terminate_child(emqx_resource_health_check_sup, {health_check, Name}),
|
||||
_ = supervisor:delete_child(emqx_resource_health_check_sup, {health_check, Name}),
|
||||
ok.
|
||||
delete_checker(Name) ->
|
||||
case supervisor:terminate_child(?MODULE, {health_check, Name}) of
|
||||
ok ->
|
||||
case supervisor:delete_child(?MODULE, {health_check, Name}) of
|
||||
{error, not_found} -> ok;
|
||||
Error -> Error
|
||||
end;
|
||||
{error, not_found} -> ok;
|
||||
Error -> Error
|
||||
end.
|
||||
|
|
|
@ -103,17 +103,17 @@ init({Pool, Id}) ->
|
|||
handle_call({create, InstId, ResourceType, Config, Opts}, _From, State) ->
|
||||
{reply, do_create(InstId, ResourceType, Config, Opts), State};
|
||||
|
||||
handle_call({create_dry_run, InstId, ResourceType, Config}, _From, State) ->
|
||||
{reply, do_create_dry_run(InstId, ResourceType, Config), State};
|
||||
handle_call({create_dry_run, ResourceType, Config}, _From, State) ->
|
||||
{reply, do_create_dry_run(ResourceType, Config), State};
|
||||
|
||||
handle_call({recreate, InstId, ResourceType, Config, Params}, _From, State) ->
|
||||
{reply, do_recreate(InstId, ResourceType, Config, Params), State};
|
||||
handle_call({recreate, InstId, ResourceType, Config, Opts}, _From, State) ->
|
||||
{reply, do_recreate(InstId, ResourceType, Config, Opts), State};
|
||||
|
||||
handle_call({remove, InstId}, _From, State) ->
|
||||
{reply, do_remove(InstId), State};
|
||||
|
||||
handle_call({restart, InstId}, _From, State) ->
|
||||
{reply, do_restart(InstId), State};
|
||||
handle_call({restart, InstId, Opts}, _From, State) ->
|
||||
{reply, do_restart(InstId, Opts), State};
|
||||
|
||||
handle_call({stop, InstId}, _From, State) ->
|
||||
{reply, do_stop(InstId), State};
|
||||
|
@ -142,20 +142,17 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%% suppress the race condition check, as these functions are protected in gproc workers
|
||||
-dialyzer({nowarn_function, [do_recreate/4,
|
||||
do_create/4,
|
||||
do_restart/1,
|
||||
do_restart/2,
|
||||
do_stop/1,
|
||||
do_health_check/1]}).
|
||||
|
||||
do_recreate(InstId, ResourceType, NewConfig, Params) ->
|
||||
do_recreate(InstId, ResourceType, NewConfig, Opts) ->
|
||||
case lookup(InstId) of
|
||||
{ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} ->
|
||||
Config = emqx_resource:call_config_merge(ResourceType, OldConfig,
|
||||
NewConfig, Params),
|
||||
TestInstId = make_test_id(),
|
||||
case do_create_dry_run(TestInstId, ResourceType, Config) of
|
||||
{ok, #{mod := ResourceType} = Data} ->
|
||||
case do_create_dry_run(ResourceType, NewConfig) of
|
||||
ok ->
|
||||
do_remove(ResourceType, InstId, ResourceState, false),
|
||||
do_create(InstId, ResourceType, Config, #{force_create => true});
|
||||
do_remove(Data, false),
|
||||
do_create(InstId, ResourceType, NewConfig, Opts#{force_create => true});
|
||||
Error ->
|
||||
Error
|
||||
end;
|
||||
|
@ -166,100 +163,86 @@ do_recreate(InstId, ResourceType, NewConfig, Params) ->
|
|||
end.
|
||||
|
||||
do_create(InstId, ResourceType, Config, Opts) ->
|
||||
ForceCreate = maps:get(force_create, Opts, false),
|
||||
case lookup(InstId) of
|
||||
{ok, _} -> {ok, already_created};
|
||||
_ ->
|
||||
Res0 = #{id => InstId, mod => ResourceType, config => Config,
|
||||
status => starting, state => undefined},
|
||||
%% The `emqx_resource:call_start/3` need the instance exist beforehand
|
||||
ets:insert(emqx_resource_instance, {InstId, Res0}),
|
||||
case emqx_resource:call_start(InstId, ResourceType, Config) of
|
||||
{ok, ResourceState} ->
|
||||
ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId),
|
||||
%% this is the first time we do health check, this will update the
|
||||
%% status and then do ets:insert/2
|
||||
_ = do_health_check(Res0#{state => ResourceState}),
|
||||
HealthCheckInterval = maps:get(health_check_interval, Opts, 15000),
|
||||
emqx_resource_health_check_sup:create_health_check_process(InstId, HealthCheckInterval),
|
||||
{error, not_found} ->
|
||||
case do_start(InstId, ResourceType, Config, Opts) of
|
||||
ok ->
|
||||
ok = emqx_resource_health_check_sup:create_checker(InstId,
|
||||
maps:get(health_check_interval, Opts, 15000)),
|
||||
ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId),
|
||||
{ok, force_lookup(InstId)};
|
||||
{error, Reason} when ForceCreate == true ->
|
||||
logger:error("start ~ts resource ~ts failed: ~p, "
|
||||
"force_create it as a stopped resource",
|
||||
[ResourceType, InstId, Reason]),
|
||||
ets:insert(emqx_resource_instance, {InstId, Res0}),
|
||||
{ok, Res0};
|
||||
{error, Reason} when ForceCreate == false ->
|
||||
ets:delete(emqx_resource_instance, InstId),
|
||||
{error, Reason}
|
||||
Error -> Error
|
||||
end
|
||||
end.
|
||||
|
||||
do_create_dry_run(InstId, ResourceType, Config) ->
|
||||
case emqx_resource:call_start(InstId, ResourceType, Config) of
|
||||
{ok, ResourceState0} ->
|
||||
Return = case emqx_resource:call_health_check(InstId, ResourceType, ResourceState0) of
|
||||
{ok, ResourceState1} -> ok;
|
||||
{error, Reason, ResourceState1} ->
|
||||
{error, Reason}
|
||||
end,
|
||||
_ = emqx_resource:call_stop(InstId, ResourceType, ResourceState1),
|
||||
do_create_dry_run(ResourceType, Config) ->
|
||||
InstId = make_test_id(),
|
||||
Opts = #{force_create => false},
|
||||
case do_create(InstId, ResourceType, Config, Opts) of
|
||||
{ok, Data} ->
|
||||
Return = do_health_check(Data),
|
||||
_ = do_remove(Data),
|
||||
Return;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
do_remove(InstId) ->
|
||||
case lookup(InstId) of
|
||||
{ok, #{mod := Mod, state := ResourceState}} ->
|
||||
do_remove(Mod, InstId, ResourceState);
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
do_remove(Instance) ->
|
||||
do_remove(Instance, true).
|
||||
|
||||
do_remove(Mod, InstId, ResourceState) ->
|
||||
do_remove(Mod, InstId, ResourceState, true).
|
||||
|
||||
do_remove(Mod, InstId, ResourceState, ClearMetrics) ->
|
||||
_ = emqx_resource:call_stop(InstId, Mod, ResourceState),
|
||||
do_remove(InstId, ClearMetrics) when is_binary(InstId) ->
|
||||
do_with_instance_data(InstId, fun do_remove/2, [ClearMetrics]);
|
||||
do_remove(#{id := InstId} = Data, ClearMetrics) ->
|
||||
_ = do_stop(Data),
|
||||
ets:delete(emqx_resource_instance, InstId),
|
||||
case ClearMetrics of
|
||||
true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId);
|
||||
false -> ok
|
||||
end,
|
||||
_ = emqx_resource_health_check_sup:delete_health_check_process(InstId),
|
||||
_ = emqx_resource_health_check_sup:delete_checker(InstId),
|
||||
ok.
|
||||
|
||||
do_restart(InstId) ->
|
||||
do_restart(InstId, Opts) ->
|
||||
case lookup(InstId) of
|
||||
{ok, #{mod := Mod, state := ResourceState, config := Config} = Data} ->
|
||||
_ = case ResourceState of
|
||||
undefined -> ok;
|
||||
_ -> emqx_resource:call_stop(InstId, Mod, ResourceState)
|
||||
end,
|
||||
case emqx_resource:call_start(InstId, Mod, Config) of
|
||||
{ok, NewResourceState} ->
|
||||
ets:insert(emqx_resource_instance,
|
||||
{InstId, Data#{state => NewResourceState, status => started}}),
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
|
||||
{error, Reason}
|
||||
end;
|
||||
{ok, #{mod := ResourceType, config := Config} = Data} ->
|
||||
ok = do_stop(Data),
|
||||
do_start(InstId, ResourceType, Config, Opts);
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
do_stop(InstId) ->
|
||||
case lookup(InstId) of
|
||||
{ok, #{mod := Mod, state := ResourceState} = Data} ->
|
||||
_ = emqx_resource:call_stop(InstId, Mod, ResourceState),
|
||||
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
|
||||
do_start(InstId, ResourceType, Config, Opts) when is_binary(InstId) ->
|
||||
ForceCreate = maps:get(force_create, Opts, false),
|
||||
Res0 = #{id => InstId, mod => ResourceType, config => Config,
|
||||
status => starting, state => undefined},
|
||||
%% The `emqx_resource:call_start/3` need the instance exist beforehand
|
||||
ets:insert(emqx_resource_instance, {InstId, Res0}),
|
||||
case emqx_resource:call_start(InstId, ResourceType, Config) of
|
||||
{ok, ResourceState} ->
|
||||
%% this is the first time we do health check, this will update the
|
||||
%% status and then do ets:insert/2
|
||||
_ = do_health_check(Res0#{state => ResourceState}),
|
||||
ok;
|
||||
Error ->
|
||||
Error
|
||||
{error, Reason} when ForceCreate == true ->
|
||||
logger:warning("start ~ts resource ~ts failed: ~p, force_create it",
|
||||
[ResourceType, InstId, Reason]),
|
||||
ets:insert(emqx_resource_instance, {InstId, Res0}),
|
||||
ok;
|
||||
{error, Reason} when ForceCreate == false ->
|
||||
ets:delete(emqx_resource_instance, InstId),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
do_stop(InstId) when is_binary(InstId) ->
|
||||
do_with_instance_data(InstId, fun do_stop/1, []);
|
||||
do_stop(#{state := undefined}) ->
|
||||
ok;
|
||||
do_stop(#{id := InstId, mod := Mod, state := ResourceState} = Data) ->
|
||||
_ = emqx_resource:call_stop(InstId, Mod, ResourceState),
|
||||
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
|
||||
ok.
|
||||
|
||||
do_health_check(InstId) when is_binary(InstId) ->
|
||||
case lookup(InstId) of
|
||||
{ok, Data} -> do_health_check(Data);
|
||||
|
@ -284,6 +267,12 @@ do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) ->
|
|||
%% internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
do_with_instance_data(InstId, Do, Args) ->
|
||||
case lookup(InstId) of
|
||||
{ok, Data} -> erlang:apply(Do, [Data | Args]);
|
||||
Error -> Error
|
||||
end.
|
||||
|
||||
proc_name(Mod, Id) ->
|
||||
list_to_atom(lists:concat([Mod, "_", Id])).
|
||||
|
||||
|
|
|
@ -49,7 +49,7 @@ init([]) ->
|
|||
#{id => emqx_resource_health_check_sup,
|
||||
start => {emqx_resource_health_check_sup, start_link, []},
|
||||
restart => transient,
|
||||
shutdown => 5000, type => supervisor, modules => [emqx_resource_health_check_sup]},
|
||||
shutdown => infinity, type => supervisor, modules => [emqx_resource_health_check_sup]},
|
||||
{ok, {SupFlags, [HealthCheck, Metrics | ResourceInsts]}}.
|
||||
|
||||
%% internal functions
|
||||
|
|
Loading…
Reference in New Issue