diff --git a/apps/emqx_authn/src/emqx_authn_utils.erl b/apps/emqx_authn/src/emqx_authn_utils.erl index bcc6f4e0e..232cf5ac1 100644 --- a/apps/emqx_authn/src/emqx_authn_utils.erl +++ b/apps/emqx_authn/src/emqx_authn_utils.erl @@ -59,16 +59,14 @@ create_resource(ResourceId, Module, Config) -> ). update_resource(Module, Config, ResourceId) -> - %% recreate before maybe stop - %% resource will auto start during recreate - Result = emqx_resource:recreate_local(ResourceId, Module, Config), - case Config of - #{enable := true} -> - Result; - #{enable := false} -> - ok = emqx_resource:stop(ResourceId), - Result - end. + Opts = #{start_after_created => false}, + Result = emqx_resource:recreate_local(ResourceId, Module, Config, Opts), + _ = + case Config of + #{enable := true} -> emqx_resource:start(ResourceId); + #{enable := false} -> ok + end, + Result. check_password_from_selected_map(_Algorithm, _Selected, undefined) -> {error, bad_username_or_password}; diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 5c10d0a6f..4396db933 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -38,7 +38,7 @@ -export([mongo_query/5, check_worker_health/1]). --define(HEALTH_CHECK_TIMEOUT, 10000). +-define(HEALTH_CHECK_TIMEOUT, 30000). %% mongo servers don't need parse -define(MONGO_HOST_OPTIONS, #{ diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index d1d82a401..f9256ac7c 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -33,7 +33,15 @@ -type create_opts() :: #{ health_check_interval => integer(), health_check_timeout => integer(), + %% We can choose to block the return of emqx_resource:start until + %% the resource connected, wait max to `wait_for_resource_ready` ms. wait_for_resource_ready => integer(), + %% If `start_after_created` is set to true, the resource is started right + %% after it is created. But note that a `started` resource is not guaranteed + %% to be `connected`. + start_after_created => boolean(), + %% If the resource disconnected, we can set to retry starting the resource + %% periodically. auto_retry_interval => integer() }. -type after_query() :: diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 2a3f29122..4837161ae 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -70,8 +70,9 @@ %% Calls to the callback module with current resource state %% They also save the state after the call finished (except query/2,3). -%% restart the instance. -export([ + start/1, + start/2, restart/1, restart/2, %% verify if the resource is working normally @@ -261,11 +262,19 @@ query(InstId, Request, AfterQuery) -> erlang:raise(Err, Reason, ST) end; {ok, _Group, _Data} -> - query_error(not_found, <<"resource not connected">>); + query_error(not_connected, <<"resource not connected">>); {error, not_found} -> query_error(not_found, <<"resource not found">>) end. +-spec start(instance_id()) -> ok | {error, Reason :: term()}. +start(InstId) -> + start(InstId, #{}). + +-spec start(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. +start(InstId, Opts) -> + emqx_resource_manager:start(InstId, Opts). + -spec restart(instance_id()) -> ok | {error, Reason :: term()}. restart(InstId) -> restart(InstId, #{}). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index bcc4422b2..54439f62e 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -96,7 +96,10 @@ create(InstId, Group, ResourceType, Config, Opts) -> [matched, success, failed, exception], [matched] ), - wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)), + case maps:get(start_after_created, Opts, true) of + true -> wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)); + false -> ok + end, ok. %% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance. @@ -108,9 +111,9 @@ create(InstId, Group, ResourceType, Config, Opts) -> create_dry_run(ResourceType, Config) -> InstId = make_test_id(), ok = emqx_resource_manager_sup:ensure_child(InstId, <<"dry_run">>, ResourceType, Config, #{}), - case wait_for_resource_ready(InstId, 5000) of + case wait_for_resource_ready(InstId, 15000) of ok -> - _ = remove(InstId); + remove(InstId); timeout -> _ = remove(InstId), {error, timeout} @@ -123,7 +126,9 @@ recreate(InstId, ResourceType, NewConfig, Opts) -> case lookup(InstId) of {ok, Group, #{mod := ResourceType, status := _} = _Data} -> _ = remove(InstId, false), - ensure_resource(InstId, Group, ResourceType, NewConfig, Opts); + create(InstId, Group, ResourceType, NewConfig, Opts), + {ok, _Group, Data} = lookup(InstId), + {ok, Data}; {ok, _, #{mod := Mod}} when Mod =/= ResourceType -> {error, updating_to_incorrect_resource_type}; {error, not_found} -> @@ -151,7 +156,7 @@ restart(InstId, Opts) when is_binary(InstId) -> Error end. -%% @doc Stop the resource +%% @doc Start the resource -spec start(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. start(InstId, Opts) -> case safe_call(InstId, start, ?T_OPERATION) of @@ -162,7 +167,7 @@ start(InstId, Opts) -> Error end. -%% @doc Start the resource +%% @doc Stop the resource -spec stop(instance_id()) -> ok | {error, Reason :: term()}. stop(InstId) -> case safe_call(InstId, stop, ?T_OPERATION) of @@ -240,13 +245,16 @@ start_link(InstId, Group, ResourceType, Config, Opts) -> state = undefined, error = undefined }, - gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, Data, []). + gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, {Data, Opts}, []). -init(Data) -> +init({Data, Opts}) -> process_flag(trap_exit, true), %% init the cache so that lookup/1 will always return something ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), - {ok, connecting, Data, {next_event, internal, try_connect}}. + case maps:get(start_after_created, Opts, true) of + true -> {ok, connecting, Data, {next_event, internal, start_resource}}; + false -> {ok, stopped, Data} + end. terminate(_Reason, _State, Data) -> _ = maybe_clear_alarm(Data#data.id), @@ -296,7 +304,7 @@ handle_event(enter, _OldState, connecting, Data) -> ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), Actions = [{state_timeout, 0, health_check}], {keep_state_and_data, Actions}; -handle_event(internal, try_connect, connecting, Data) -> +handle_event(internal, start_resource, connecting, Data) -> start_resource(Data, undefined); handle_event(state_timeout, health_check, connecting, Data) -> handle_connecting_health_check(Data); diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 2519bc7d3..6da68a537 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -126,8 +126,46 @@ t_create_remove_local(_) -> ok = emqx_resource:remove_local(?ID), {error, _} = emqx_resource:remove_local(?ID), + ?assertMatch( + {error, {emqx_resource, #{reason := not_found}}}, + emqx_resource:query(?ID, get_state) + ), ?assertNot(is_process_alive(Pid)). +t_do_not_start_after_created(_) -> + {ok, _} = emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{start_after_created => false} + ), + %% the resource should remain `disconnected` after created + timer:sleep(200), + ?assertMatch( + {error, {emqx_resource, #{reason := not_connected}}}, + emqx_resource:query(?ID, get_state) + ), + ?assertMatch( + {ok, _, #{status := disconnected}}, + emqx_resource:get_instance(?ID) + ), + + %% start the resource manually.. + ok = emqx_resource:start(?ID), + #{pid := Pid} = emqx_resource:query(?ID, get_state), + ?assert(is_process_alive(Pid)), + + %% restart the resource + ok = emqx_resource:restart(?ID), + ?assertNot(is_process_alive(Pid)), + #{pid := Pid2} = emqx_resource:query(?ID, get_state), + ?assert(is_process_alive(Pid2)), + + ok = emqx_resource:remove_local(?ID), + + ?assertNot(is_process_alive(Pid2)). + t_query(_) -> {ok, _} = emqx_resource:create_local( ?ID, @@ -231,7 +269,7 @@ t_stop_start(_) -> ?assertNot(is_process_alive(Pid0)), ?assertMatch( - {error, {emqx_resource, #{reason := not_found}}}, + {error, {emqx_resource, #{reason := not_connected}}}, emqx_resource:query(?ID, get_state) ), @@ -273,7 +311,7 @@ t_stop_start_local(_) -> ?assertNot(is_process_alive(Pid0)), ?assertMatch( - {error, {emqx_resource, #{reason := not_found}}}, + {error, {emqx_resource, #{reason := not_connected}}}, emqx_resource:query(?ID, get_state) ),