From 2277b75b2f01d754c6bcb0402072d988a7854618 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sun, 2 Jan 2022 16:41:50 +0800 Subject: [PATCH] refactor(resource): improve the process starting/stopping resource instances --- apps/emqx_resource/src/emqx_resource.erl | 46 +++--- .../src/emqx_resource_health_check.erl | 24 +-- .../src/emqx_resource_health_check_sup.erl | 37 +++-- .../src/emqx_resource_instance.erl | 153 ++++++++---------- apps/emqx_resource/src/emqx_resource_sup.erl | 2 +- 5 files changed, 127 insertions(+), 135 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 5ec5fd92a..12ae912e8 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -58,6 +58,7 @@ %% Calls to the callback module with current resource state %% They also save the state after the call finished (except query/2,3). -export([ restart/1 %% restart the instance. + , restart/2 , health_check/1 %% verify if the resource is working normally , stop/1 %% stop the instance , query/2 %% query the instance @@ -68,7 +69,6 @@ -export([ call_start/3 %% start the instance , call_health_check/3 %% verify if the resource is working normally , call_stop/3 %% stop the instance - , call_config_merge/4 %% merge the config when updating , call_jsonify/2 ]). @@ -86,12 +86,9 @@ -optional_callbacks([ on_query/4 , on_health_check/2 - , on_config_merge/3 , on_jsonify/1 ]). --callback on_config_merge(resource_config(), resource_config(), term()) -> resource_config(). - -callback on_jsonify(resource_config()) -> jsx:json_term(). %% when calling emqx_resource:start/1 @@ -169,18 +166,17 @@ create_dry_run(ResourceType, Config) -> -spec create_dry_run_local(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run_local(ResourceType, Config) -> - InstId = emqx_resource_instance:make_test_id(), - call_instance(InstId, {create_dry_run, InstId, ResourceType, Config}). + call_instance(<>, {create_dry_run, ResourceType, Config}). --spec recreate(instance_id(), resource_type(), resource_config(), term()) -> +-spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. -recreate(InstId, ResourceType, Config, Params) -> - cluster_call(recreate_local, [InstId, ResourceType, Config, Params]). +recreate(InstId, ResourceType, Config, Opts) -> + cluster_call(recreate_local, [InstId, ResourceType, Config, Opts]). --spec recreate_local(instance_id(), resource_type(), resource_config(), term()) -> +-spec recreate_local(instance_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. -recreate_local(InstId, ResourceType, Config, Params) -> - call_instance(InstId, {recreate, InstId, ResourceType, Config, Params}). +recreate_local(InstId, ResourceType, Config, Opts) -> + call_instance(InstId, {recreate, InstId, ResourceType, Config, Opts}). -spec remove(instance_id()) -> ok | {error, Reason :: term()}. remove(InstId) -> @@ -216,7 +212,11 @@ query(InstId, Request, AfterQuery) -> -spec restart(instance_id()) -> ok | {error, Reason :: term()}. restart(InstId) -> - call_instance(InstId, {restart, InstId}). + restart(InstId, #{}). + +-spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. +restart(InstId, Opts) -> + call_instance(InstId, {restart, InstId, Opts}). -spec stop(instance_id()) -> ok | {error, Reason :: term()}. stop(InstId) -> @@ -276,14 +276,6 @@ call_health_check(InstId, Mod, ResourceState) -> call_stop(InstId, Mod, ResourceState) -> ?SAFE_CALL(Mod:on_stop(InstId, ResourceState)). --spec call_config_merge(module(), resource_config(), resource_config(), term()) -> - resource_config(). -call_config_merge(Mod, OldConfig, NewConfig, Params) -> - case erlang:function_exported(Mod, on_config_merge, 3) of - true -> ?SAFE_CALL(Mod:on_config_merge(OldConfig, NewConfig, Params)); - false -> NewConfig - end. - -spec call_jsonify(module(), resource_config()) -> jsx:json_term(). call_jsonify(Mod, Config) -> case erlang:function_exported(Mod, on_jsonify, 1) of @@ -330,17 +322,17 @@ check_and_create_local(InstId, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, fun(InstConf) -> create_local(InstId, ResourceType, InstConf, Opts) end). --spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), term()) -> +-spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), create_opts()) -> {ok, resource_data()} | {error, term()}. -check_and_recreate(InstId, ResourceType, RawConfig, Params) -> +check_and_recreate(InstId, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, - fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Params) end). + fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Opts) end). --spec check_and_recreate_local(instance_id(), resource_type(), raw_resource_config(), term()) -> +-spec check_and_recreate_local(instance_id(), resource_type(), raw_resource_config(), create_opts()) -> {ok, resource_data()} | {error, term()}. -check_and_recreate_local(InstId, ResourceType, RawConfig, Params) -> +check_and_recreate_local(InstId, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, - fun(InstConf) -> recreate_local(InstId, ResourceType, InstConf, Params) end). + fun(InstConf) -> recreate_local(InstId, ResourceType, InstConf, Opts) end). check_and_do(ResourceType, RawConfig, Do) when is_function(Do) -> case check_config(ResourceType, RawConfig) of diff --git a/apps/emqx_resource/src/emqx_resource_health_check.erl b/apps/emqx_resource/src/emqx_resource_health_check.erl index 50b236daa..dc0795fb3 100644 --- a/apps/emqx_resource/src/emqx_resource_health_check.erl +++ b/apps/emqx_resource/src/emqx_resource_health_check.erl @@ -15,29 +15,21 @@ %%-------------------------------------------------------------------- -module(emqx_resource_health_check). --export([child_spec/2]). - -export([start_link/2]). -export([health_check/2]). -child_spec(Name, Sleep) -> - #{id => {health_check, Name}, - start => {?MODULE, start_link, [Name, Sleep]}, - restart => transient, - shutdown => 5000, type => worker, modules => [?MODULE]}. - -start_link(Name, Sleep) -> +start_link(Name, Sleep) -> Pid = proc_lib:spawn_link(?MODULE, health_check, [Name, Sleep]), {ok, Pid}. -health_check(Name, SleepTime) -> +health_check(Name, SleepTime) -> timer:sleep(SleepTime), - case emqx_resource:health_check(Name) of - ok -> + case emqx_resource:health_check(Name) of + ok -> emqx_alarm:deactivate(Name); - {error, _} -> - emqx_alarm:activate(Name, #{name => Name}, - <>) + {error, _} -> + emqx_alarm:activate(Name, #{name => Name}, + <>) end, - health_check(Name, SleepTime). \ No newline at end of file + health_check(Name, SleepTime). diff --git a/apps/emqx_resource/src/emqx_resource_health_check_sup.erl b/apps/emqx_resource/src/emqx_resource_health_check_sup.erl index 571cd6338..6a2b07e94 100644 --- a/apps/emqx_resource/src/emqx_resource_health_check_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_health_check_sup.erl @@ -20,8 +20,17 @@ -export([start_link/0]). -export([init/1, - create_health_check_process/2, - delete_health_check_process/1]). + create_checker/2, + delete_checker/1]). + +-define(HEALTH_CHECK_MOD, emqx_resource_health_check). +-define(ID(NAME), {resource_health_check, NAME}). + +child_spec(Name, Sleep) -> + #{id => ?ID(Name), + start => {?HEALTH_CHECK_MOD, start_link, [Name, Sleep]}, + restart => transient, + shutdown => 5000, type => worker, modules => [?HEALTH_CHECK_MOD]}. start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). @@ -30,11 +39,21 @@ init([]) -> SupFlags = #{strategy => one_for_one, intensity => 10, period => 10}, {ok, {SupFlags, []}}. -create_health_check_process(Name, Sleep) -> - supervisor:start_child(emqx_resource_health_check_sup, - emqx_resource_health_check:child_spec(Name, Sleep)). +create_checker(Name, Sleep) -> + case supervisor:start_child(?MODULE, child_spec(Name, Sleep)) of + {ok, _} -> ok; + {error, already_present} -> ok; + {error, {already_started, _}} -> ok; + Error -> Error + end. -delete_health_check_process(Name) -> - _ = supervisor:terminate_child(emqx_resource_health_check_sup, {health_check, Name}), - _ = supervisor:delete_child(emqx_resource_health_check_sup, {health_check, Name}), - ok. \ No newline at end of file +delete_checker(Name) -> + case supervisor:terminate_child(?MODULE, {health_check, Name}) of + ok -> + case supervisor:delete_child(?MODULE, {health_check, Name}) of + {error, not_found} -> ok; + Error -> Error + end; + {error, not_found} -> ok; + Error -> Error + end. diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index b2f2d0d27..2701b70d6 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -103,17 +103,17 @@ init({Pool, Id}) -> handle_call({create, InstId, ResourceType, Config, Opts}, _From, State) -> {reply, do_create(InstId, ResourceType, Config, Opts), State}; -handle_call({create_dry_run, InstId, ResourceType, Config}, _From, State) -> - {reply, do_create_dry_run(InstId, ResourceType, Config), State}; +handle_call({create_dry_run, ResourceType, Config}, _From, State) -> + {reply, do_create_dry_run(ResourceType, Config), State}; -handle_call({recreate, InstId, ResourceType, Config, Params}, _From, State) -> - {reply, do_recreate(InstId, ResourceType, Config, Params), State}; +handle_call({recreate, InstId, ResourceType, Config, Opts}, _From, State) -> + {reply, do_recreate(InstId, ResourceType, Config, Opts), State}; handle_call({remove, InstId}, _From, State) -> {reply, do_remove(InstId), State}; -handle_call({restart, InstId}, _From, State) -> - {reply, do_restart(InstId), State}; +handle_call({restart, InstId, Opts}, _From, State) -> + {reply, do_restart(InstId, Opts), State}; handle_call({stop, InstId}, _From, State) -> {reply, do_stop(InstId), State}; @@ -142,20 +142,17 @@ code_change(_OldVsn, State, _Extra) -> %% suppress the race condition check, as these functions are protected in gproc workers -dialyzer({nowarn_function, [do_recreate/4, do_create/4, - do_restart/1, + do_restart/2, do_stop/1, do_health_check/1]}). -do_recreate(InstId, ResourceType, NewConfig, Params) -> +do_recreate(InstId, ResourceType, NewConfig, Opts) -> case lookup(InstId) of - {ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} -> - Config = emqx_resource:call_config_merge(ResourceType, OldConfig, - NewConfig, Params), - TestInstId = make_test_id(), - case do_create_dry_run(TestInstId, ResourceType, Config) of + {ok, #{mod := ResourceType} = Data} -> + case do_create_dry_run(ResourceType, NewConfig) of ok -> - do_remove(ResourceType, InstId, ResourceState, false), - do_create(InstId, ResourceType, Config, #{force_create => true}); + do_remove(Data, false), + do_create(InstId, ResourceType, NewConfig, Opts#{force_create => true}); Error -> Error end; @@ -166,100 +163,86 @@ do_recreate(InstId, ResourceType, NewConfig, Params) -> end. do_create(InstId, ResourceType, Config, Opts) -> - ForceCreate = maps:get(force_create, Opts, false), case lookup(InstId) of {ok, _} -> {ok, already_created}; - _ -> - Res0 = #{id => InstId, mod => ResourceType, config => Config, - status => starting, state => undefined}, - %% The `emqx_resource:call_start/3` need the instance exist beforehand - ets:insert(emqx_resource_instance, {InstId, Res0}), - case emqx_resource:call_start(InstId, ResourceType, Config) of - {ok, ResourceState} -> - ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId), - %% this is the first time we do health check, this will update the - %% status and then do ets:insert/2 - _ = do_health_check(Res0#{state => ResourceState}), - HealthCheckInterval = maps:get(health_check_interval, Opts, 15000), - emqx_resource_health_check_sup:create_health_check_process(InstId, HealthCheckInterval), + {error, not_found} -> + case do_start(InstId, ResourceType, Config, Opts) of + ok -> + ok = emqx_resource_health_check_sup:create_checker(InstId, + maps:get(health_check_interval, Opts, 15000)), + ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId), {ok, force_lookup(InstId)}; - {error, Reason} when ForceCreate == true -> - logger:error("start ~ts resource ~ts failed: ~p, " - "force_create it as a stopped resource", - [ResourceType, InstId, Reason]), - ets:insert(emqx_resource_instance, {InstId, Res0}), - {ok, Res0}; - {error, Reason} when ForceCreate == false -> - ets:delete(emqx_resource_instance, InstId), - {error, Reason} + Error -> Error end end. -do_create_dry_run(InstId, ResourceType, Config) -> - case emqx_resource:call_start(InstId, ResourceType, Config) of - {ok, ResourceState0} -> - Return = case emqx_resource:call_health_check(InstId, ResourceType, ResourceState0) of - {ok, ResourceState1} -> ok; - {error, Reason, ResourceState1} -> - {error, Reason} - end, - _ = emqx_resource:call_stop(InstId, ResourceType, ResourceState1), +do_create_dry_run(ResourceType, Config) -> + InstId = make_test_id(), + Opts = #{force_create => false}, + case do_create(InstId, ResourceType, Config, Opts) of + {ok, Data} -> + Return = do_health_check(Data), + _ = do_remove(Data), Return; {error, Reason} -> {error, Reason} end. -do_remove(InstId) -> - case lookup(InstId) of - {ok, #{mod := Mod, state := ResourceState}} -> - do_remove(Mod, InstId, ResourceState); - Error -> - Error - end. +do_remove(Instance) -> + do_remove(Instance, true). -do_remove(Mod, InstId, ResourceState) -> - do_remove(Mod, InstId, ResourceState, true). - -do_remove(Mod, InstId, ResourceState, ClearMetrics) -> - _ = emqx_resource:call_stop(InstId, Mod, ResourceState), +do_remove(InstId, ClearMetrics) when is_binary(InstId) -> + do_with_instance_data(InstId, fun do_remove/2, [ClearMetrics]); +do_remove(#{id := InstId} = Data, ClearMetrics) -> + _ = do_stop(Data), ets:delete(emqx_resource_instance, InstId), case ClearMetrics of true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId); false -> ok end, - _ = emqx_resource_health_check_sup:delete_health_check_process(InstId), + _ = emqx_resource_health_check_sup:delete_checker(InstId), ok. -do_restart(InstId) -> +do_restart(InstId, Opts) -> case lookup(InstId) of - {ok, #{mod := Mod, state := ResourceState, config := Config} = Data} -> - _ = case ResourceState of - undefined -> ok; - _ -> emqx_resource:call_stop(InstId, Mod, ResourceState) - end, - case emqx_resource:call_start(InstId, Mod, Config) of - {ok, NewResourceState} -> - ets:insert(emqx_resource_instance, - {InstId, Data#{state => NewResourceState, status => started}}), - ok; - {error, Reason} -> - ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), - {error, Reason} - end; + {ok, #{mod := ResourceType, config := Config} = Data} -> + ok = do_stop(Data), + do_start(InstId, ResourceType, Config, Opts); Error -> Error end. -do_stop(InstId) -> - case lookup(InstId) of - {ok, #{mod := Mod, state := ResourceState} = Data} -> - _ = emqx_resource:call_stop(InstId, Mod, ResourceState), - ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), +do_start(InstId, ResourceType, Config, Opts) when is_binary(InstId) -> + ForceCreate = maps:get(force_create, Opts, false), + Res0 = #{id => InstId, mod => ResourceType, config => Config, + status => starting, state => undefined}, + %% The `emqx_resource:call_start/3` need the instance exist beforehand + ets:insert(emqx_resource_instance, {InstId, Res0}), + case emqx_resource:call_start(InstId, ResourceType, Config) of + {ok, ResourceState} -> + %% this is the first time we do health check, this will update the + %% status and then do ets:insert/2 + _ = do_health_check(Res0#{state => ResourceState}), ok; - Error -> - Error + {error, Reason} when ForceCreate == true -> + logger:warning("start ~ts resource ~ts failed: ~p, force_create it", + [ResourceType, InstId, Reason]), + ets:insert(emqx_resource_instance, {InstId, Res0}), + ok; + {error, Reason} when ForceCreate == false -> + ets:delete(emqx_resource_instance, InstId), + {error, Reason} end. +do_stop(InstId) when is_binary(InstId) -> + do_with_instance_data(InstId, fun do_stop/1, []); +do_stop(#{state := undefined}) -> + ok; +do_stop(#{id := InstId, mod := Mod, state := ResourceState} = Data) -> + _ = emqx_resource:call_stop(InstId, Mod, ResourceState), + ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), + ok. + do_health_check(InstId) when is_binary(InstId) -> case lookup(InstId) of {ok, Data} -> do_health_check(Data); @@ -284,6 +267,12 @@ do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) -> %% internal functions %%------------------------------------------------------------------------------ +do_with_instance_data(InstId, Do, Args) -> + case lookup(InstId) of + {ok, Data} -> erlang:apply(Do, [Data | Args]); + Error -> Error + end. + proc_name(Mod, Id) -> list_to_atom(lists:concat([Mod, "_", Id])). diff --git a/apps/emqx_resource/src/emqx_resource_sup.erl b/apps/emqx_resource/src/emqx_resource_sup.erl index b5655e301..99d601ec4 100644 --- a/apps/emqx_resource/src/emqx_resource_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_sup.erl @@ -49,7 +49,7 @@ init([]) -> #{id => emqx_resource_health_check_sup, start => {emqx_resource_health_check_sup, start_link, []}, restart => transient, - shutdown => 5000, type => supervisor, modules => [emqx_resource_health_check_sup]}, + shutdown => infinity, type => supervisor, modules => [emqx_resource_health_check_sup]}, {ok, {SupFlags, [HealthCheck, Metrics | ResourceInsts]}}. %% internal functions