diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 1e11046db..d4fc3df2d 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -192,7 +192,7 @@ create(Type, Name, Conf) -> ?SLOG(info, #{msg => "create bridge", type => Type, name => Name, config => Conf}), case emqx_resource:create_local(resource_id(Type, Name), emqx_bridge:resource_type(Type), - parse_confs(Type, Name, Conf)) of + parse_confs(Type, Name, Conf), #{force_create => true}) of {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf); {ok, _} -> maybe_disable_bridge(Type, Name, Conf); {error, Reason} -> {error, Reason} diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index e5eb1785f..08c230401 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -29,6 +29,12 @@ metrics := emqx_plugin_libs_metrics:metrics() }. -type resource_group() :: binary(). +-type create_opts() :: #{ + %% The emqx_resource:create/4 will return OK event if the Mod:on_start/2 fails, + %% the 'status' of the resource will be 'stopped' in this case. + %% Defaults to 'false' + force_create => boolean() + }. -type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} | undefined. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index fd4046505..37c4caa2e 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -33,7 +33,9 @@ -export([ check_config/2 , check_and_create/3 + , check_and_create/4 , check_and_create_local/3 + , check_and_create_local/4 , check_and_recreate/4 , check_and_recreate_local/4 ]). @@ -42,7 +44,9 @@ %% provisional solution: rpc:multical to all the nodes for creating/updating/removing %% todo: replicate operations -export([ create/3 %% store the config and start the instance + , create/4 , create_local/3 + , create_local/4 , create_dry_run/2 %% run start/2, health_check/2 and stop/1 sequentially , create_dry_run_local/2 , recreate/4 %% this will do create_dry_run, stop the old instance and start a new one @@ -141,12 +145,22 @@ apply_query_after_calls(Funcs) -> -spec create(instance_id(), resource_type(), resource_config()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. create(InstId, ResourceType, Config) -> - cluster_call(create_local, [InstId, ResourceType, Config]). + create(InstId, ResourceType, Config, #{}). + +-spec create(instance_id(), resource_type(), resource_config(), create_opts()) -> + {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. +create(InstId, ResourceType, Config, Opts) -> + cluster_call(create_local, [InstId, ResourceType, Config, Opts]). -spec create_local(instance_id(), resource_type(), resource_config()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. create_local(InstId, ResourceType, Config) -> - call_instance(InstId, {create, InstId, ResourceType, Config}). + create_local(InstId, ResourceType, Config, #{}). + +-spec create_local(instance_id(), resource_type(), resource_config(), create_opts()) -> + {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. +create_local(InstId, ResourceType, Config, Opts) -> + call_instance(InstId, {create, InstId, ResourceType, Config, Opts}). -spec create_dry_run(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. @@ -294,14 +308,24 @@ check_config(ResourceType, RawConfigTerm) -> -spec check_and_create(instance_id(), resource_type(), raw_resource_config()) -> {ok, resource_data() | 'already_created'} | {error, term()}. check_and_create(InstId, ResourceType, RawConfig) -> + check_and_create(InstId, ResourceType, RawConfig, #{}). + +-spec check_and_create(instance_id(), resource_type(), raw_resource_config(), create_opts()) -> + {ok, resource_data() | 'already_created'} | {error, term()}. +check_and_create(InstId, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, - fun(InstConf) -> create(InstId, ResourceType, InstConf) end). + fun(InstConf) -> create(InstId, ResourceType, InstConf, Opts) end). -spec check_and_create_local(instance_id(), resource_type(), raw_resource_config()) -> {ok, resource_data()} | {error, term()}. check_and_create_local(InstId, ResourceType, RawConfig) -> + check_and_create_local(InstId, ResourceType, RawConfig, #{}). + +-spec check_and_create_local(instance_id(), resource_type(), raw_resource_config(), + create_opts()) -> {ok, resource_data()} | {error, term()}. +check_and_create_local(InstId, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, - fun(InstConf) -> create_local(InstId, ResourceType, InstConf) end). + fun(InstConf) -> create_local(InstId, ResourceType, InstConf, Opts) end). -spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), term()) -> {ok, resource_data()} | {error, term()}. diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 708dc6030..497affa5e 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -26,7 +26,6 @@ -export([ lookup/1 , get_metrics/1 , list_all/0 - , create_local/3 ]). -export([ hash_call/2 @@ -85,15 +84,6 @@ list_all() -> error:badarg -> [] end. - --spec create_local(instance_id(), resource_type(), resource_config()) -> - {ok, resource_data()} | {error, term()}. -create_local(InstId, ResourceType, InstConf) -> - case hash_call(InstId, {create, InstId, ResourceType, InstConf}, 15000) of - {ok, Data} -> {ok, Data}; - Error -> Error - end. - %%------------------------------------------------------------------------------ %% gen_server callbacks %%------------------------------------------------------------------------------ @@ -105,8 +95,8 @@ init({Pool, Id}) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #state{worker_pool = Pool, worker_id = Id}}. -handle_call({create, InstId, ResourceType, Config}, _From, State) -> - {reply, do_create(InstId, ResourceType, Config), State}; +handle_call({create, InstId, ResourceType, Config, Opts}, _From, State) -> + {reply, do_create(InstId, ResourceType, Config, Opts), State}; handle_call({create_dry_run, InstId, ResourceType, Config}, _From, State) -> {reply, do_create_dry_run(InstId, ResourceType, Config), State}; @@ -146,7 +136,7 @@ code_change(_OldVsn, State, _Extra) -> %% suppress the race condition check, as these functions are protected in gproc workers -dialyzer({nowarn_function, [do_recreate/4, - do_create/3, + do_create/4, do_restart/1, do_stop/1, do_health_check/1]}). @@ -160,7 +150,7 @@ do_recreate(InstId, ResourceType, NewConfig, Params) -> case do_create_dry_run(TestInstId, ResourceType, Config) of ok -> do_remove(ResourceType, InstId, ResourceState), - do_create(InstId, ResourceType, Config); + do_create(InstId, ResourceType, Config, #{force_create => true}); Error -> Error end; @@ -170,7 +160,8 @@ do_recreate(InstId, ResourceType, NewConfig, Params) -> {error, not_found} end. -do_create(InstId, ResourceType, Config) -> +do_create(InstId, ResourceType, Config, Opts) -> + ForceCreate = maps:get(force_create, Opts, false), case lookup(InstId) of {ok, _} -> {ok, already_created}; _ -> @@ -183,11 +174,14 @@ do_create(InstId, ResourceType, Config) -> %% status and then do ets:insert/2 _ = do_health_check(Res0#{state => ResourceState}), {ok, force_lookup(InstId)}; - {error, Reason} -> - logger:error("start ~ts resource ~ts failed: ~p", + {error, Reason} when ForceCreate == true -> + logger:error("start ~ts resource ~ts failed: ~p, " + "force_create it as a stopped resource", [ResourceType, InstId, Reason]), ets:insert(emqx_resource_instance, {InstId, Res0}), - {ok, Res0} + {ok, Res0}; + {error, Reason} when ForceCreate == false -> + {error, Reason} end end. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 4f641c85a..6b2e5903e 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -142,10 +142,7 @@ t_stop_start(_) -> ?assertNot(is_process_alive(Pid0)), - ?assertException( - error, - {?ID, stopped}, - emqx_resource:query(?ID, get_state)), + ?assertException(error, {resource_stopped, ?ID}, emqx_resource:query(?ID, get_state)), ok = emqx_resource:restart(?ID),